Brian Johnson created PIG-4166:
----------------------------------
Summary: Collected group drops last record when combined with
merge join
Key: PIG-4166
URL: https://issues.apache.org/jira/browse/PIG-4166
Project: Pig
Issue Type: Bug
Affects Versions: 0.12.0
Reporter: Brian Johnson
If the final two keys in each relation join, they will never make it to the
final output. The reason is that POMergeJoin does a read-ahead and
POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput
== true. This prevents the final join from being output because POMergeJoin
never sees endOfAllInput == true.
diff --git
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
---
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
@Override
public Result getNextTuple() throws ExecException {
- // Since the output is buffered, we need to flush the last
- // set of records when the close method is called by mapper.
- if (this.parentPlan.endOfAllInput) {
- if (outputBag != null) {
- Tuple tup = mTupleFactory.newTuple(2);
- tup.set(0, prevKey);
- tup.set(1, outputBag);
- outputBag = null;
- return new Result(POStatus.STATUS_OK, tup);
- }
-
- return new Result(POStatus.STATUS_EOP, null);
- }
+
Result inp = null;
Result res = null;
while (true) {
inp = processInput();
+
if (inp.returnStatus == POStatus.STATUS_EOP ||
inp.returnStatus == POStatus.STATUS_ERR) {
- break;
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
+ if (this.parentPlan.endOfAllInput) {
+ if (outputBag != null) {
+ Tuple tup = mTupleFactory.newTuple(2);
+ tup.set(0, prevKey);
+ tup.set(1, outputBag);
+ outputBag = null;
+ return new Result(POStatus.STATUS_OK, tup);
+ }
+
+ return new Result(POStatus.STATUS_EOP, null);
+ } else
+ break;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)