[
https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174318#comment-14174318
]
Brian Johnson commented on PIG-4166:
------------------------------------
I included the patch above. I didn't create a specific test case for it, but
all tests still pass.
> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
> Key: PIG-4166
> URL: https://issues.apache.org/jira/browse/PIG-4166
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.12.0
> Reporter: Brian Johnson
> Fix For: 0.15.0
>
>
> If the final two keys in each relation join, they will never make it to the
> final output. The reason is that POMergeJoin does a read-ahead and
> POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput
> == true. This prevents the final join from being output because POMergeJoin
> never sees endOfAllInput == true.
> {code}
> diff --git
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
>
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> ---
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
> @Override
> public Result getNextTuple() throws ExecException {
>
> - // Since the output is buffered, we need to flush the last
> - // set of records when the close method is called by mapper.
> - if (this.parentPlan.endOfAllInput) {
> - if (outputBag != null) {
> - Tuple tup = mTupleFactory.newTuple(2);
> - tup.set(0, prevKey);
> - tup.set(1, outputBag);
> - outputBag = null;
> - return new Result(POStatus.STATUS_OK, tup);
> - }
> -
> - return new Result(POStatus.STATUS_EOP, null);
> - }
> +
>
> Result inp = null;
> Result res = null;
>
> while (true) {
> inp = processInput();
> +
> if (inp.returnStatus == POStatus.STATUS_EOP ||
> inp.returnStatus == POStatus.STATUS_ERR) {
> - break;
> + // Since the output is buffered, we need to flush the last
> + // set of records when the close method is called by mapper.
> + if (this.parentPlan.endOfAllInput) {
> + if (outputBag != null) {
> + Tuple tup = mTupleFactory.newTuple(2);
> + tup.set(0, prevKey);
> + tup.set(1, outputBag);
> + outputBag = null;
> + return new Result(POStatus.STATUS_OK, tup);
> + }
> +
> + return new Result(POStatus.STATUS_EOP, null);
> + } else
> + break;
> }
>
> if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)