[
https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brian Johnson updated PIG-4166:
-------------------------------
Description:
If the final two keys in each relation join, they will never make it to the
final output. The reason is that POMergeJoin does a read-ahead and
POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput
== true. This prevents the final join from being output because POMergeJoin
never sees endOfAllInput == true.
{code}
diff --git
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
---
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
@Override
public Result getNextTuple() throws ExecException {
- // Since the output is buffered, we need to flush the last
- // set of records when the close method is called by mapper.
- if (this.parentPlan.endOfAllInput) {
- if (outputBag != null) {
- Tuple tup = mTupleFactory.newTuple(2);
- tup.set(0, prevKey);
- tup.set(1, outputBag);
- outputBag = null;
- return new Result(POStatus.STATUS_OK, tup);
- }
-
- return new Result(POStatus.STATUS_EOP, null);
- }
+
Result inp = null;
Result res = null;
while (true) {
inp = processInput();
+
if (inp.returnStatus == POStatus.STATUS_EOP ||
inp.returnStatus == POStatus.STATUS_ERR) {
- break;
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
+ if (this.parentPlan.endOfAllInput) {
+ if (outputBag != null) {
+ Tuple tup = mTupleFactory.newTuple(2);
+ tup.set(0, prevKey);
+ tup.set(1, outputBag);
+ outputBag = null;
+ return new Result(POStatus.STATUS_OK, tup);
+ }
+
+ return new Result(POStatus.STATUS_EOP, null);
+ } else
+ break;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
{code}
was:
If the final two keys in each relation join, they will never make it to the
final output. The reason is that POMergeJoin does a read-ahead and
POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput
== true. This prevents the final join from being output because POMergeJoin
never sees endOfAllInput == true.
diff --git
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
index c355d1d..8fd44fa 100644
---
a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
+++
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
@@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
@Override
public Result getNextTuple() throws ExecException {
- // Since the output is buffered, we need to flush the last
- // set of records when the close method is called by mapper.
- if (this.parentPlan.endOfAllInput) {
- if (outputBag != null) {
- Tuple tup = mTupleFactory.newTuple(2);
- tup.set(0, prevKey);
- tup.set(1, outputBag);
- outputBag = null;
- return new Result(POStatus.STATUS_OK, tup);
- }
-
- return new Result(POStatus.STATUS_EOP, null);
- }
+
Result inp = null;
Result res = null;
while (true) {
inp = processInput();
+
if (inp.returnStatus == POStatus.STATUS_EOP ||
inp.returnStatus == POStatus.STATUS_ERR) {
- break;
+ // Since the output is buffered, we need to flush the last
+ // set of records when the close method is called by mapper.
+ if (this.parentPlan.endOfAllInput) {
+ if (outputBag != null) {
+ Tuple tup = mTupleFactory.newTuple(2);
+ tup.set(0, prevKey);
+ tup.set(1, outputBag);
+ outputBag = null;
+ return new Result(POStatus.STATUS_OK, tup);
+ }
+
+ return new Result(POStatus.STATUS_EOP, null);
+ } else
+ break;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
> Key: PIG-4166
> URL: https://issues.apache.org/jira/browse/PIG-4166
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.12.0
> Reporter: Brian Johnson
>
> If the final two keys in each relation join, they will never make it to the
> final output. The reason is that POMergeJoin does a read-ahead and
> POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput
> == true. This prevents the final join from being output because POMergeJoin
> never sees endOfAllInput == true.
> {code}
> diff --git
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
>
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> ---
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
> @Override
> public Result getNextTuple() throws ExecException {
>
> - // Since the output is buffered, we need to flush the last
> - // set of records when the close method is called by mapper.
> - if (this.parentPlan.endOfAllInput) {
> - if (outputBag != null) {
> - Tuple tup = mTupleFactory.newTuple(2);
> - tup.set(0, prevKey);
> - tup.set(1, outputBag);
> - outputBag = null;
> - return new Result(POStatus.STATUS_OK, tup);
> - }
> -
> - return new Result(POStatus.STATUS_EOP, null);
> - }
> +
>
> Result inp = null;
> Result res = null;
>
> while (true) {
> inp = processInput();
> +
> if (inp.returnStatus == POStatus.STATUS_EOP ||
> inp.returnStatus == POStatus.STATUS_ERR) {
> - break;
> + // Since the output is buffered, we need to flush the last
> + // set of records when the close method is called by mapper.
> + if (this.parentPlan.endOfAllInput) {
> + if (outputBag != null) {
> + Tuple tup = mTupleFactory.newTuple(2);
> + tup.set(0, prevKey);
> + tup.set(1, outputBag);
> + outputBag = null;
> + return new Result(POStatus.STATUS_OK, tup);
> + }
> +
> + return new Result(POStatus.STATUS_EOP, null);
> + } else
> + break;
> }
>
> if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)