Brian Johnson created PIG-4166:
----------------------------------

             Summary: Collected group drops last record when combined with 
merge join
                 Key: PIG-4166
                 URL: https://issues.apache.org/jira/browse/PIG-4166
             Project: Pig
          Issue Type: Bug
    Affects Versions: 0.12.0
            Reporter: Brian Johnson


If the final two keys in each relation join, they will never make it to the 
final output. The reason is that POMergeJoin does a read-ahead and 
POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput 
== true. This prevents the final join from being output because POMergeJoin 
never sees endOfAllInput == true.

diff --git 
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
--- 
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++ 
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
     @Override
     public Result getNextTuple() throws ExecException {
 
-        // Since the output is buffered, we need to flush the last
-        // set of records when the close method is called by mapper.
-        if (this.parentPlan.endOfAllInput) {
-            if (outputBag != null) {
-                Tuple tup = mTupleFactory.newTuple(2);
-                tup.set(0, prevKey);
-                tup.set(1, outputBag);
-                outputBag = null;
-                return new Result(POStatus.STATUS_OK, tup);
-            }
-
-            return new Result(POStatus.STATUS_EOP, null);
-        }
+        
 
         Result inp = null;
         Result res = null;
 
         while (true) {
             inp = processInput();
+
             if (inp.returnStatus == POStatus.STATUS_EOP ||
                     inp.returnStatus == POStatus.STATUS_ERR) {
-                break;
+               // Since the output is buffered, we need to flush the last
+                // set of records when the close method is called by mapper.
+                if (this.parentPlan.endOfAllInput) {
+                    if (outputBag != null) {
+                        Tuple tup = mTupleFactory.newTuple(2);
+                        tup.set(0, prevKey);
+                        tup.set(1, outputBag);
+                        outputBag = null;
+                        return new Result(POStatus.STATUS_OK, tup);
+                    }
+
+                    return new Result(POStatus.STATUS_EOP, null);
+                } else
+                       break;
             }
 
             if (inp.returnStatus == POStatus.STATUS_NULL) {



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to