[
https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14188924#comment-14188924
]
Brian Johnson commented on PIG-4166:
------------------------------------
Are you sure you want a test for this change? The cost of the test is very high
against the small chance that someone would return this code to it's broken
state. These tests would benefit from a single startup/shutdown for the pig
server. It's not really practical to have so many unit tests that take several
minutes to run.
> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
> Key: PIG-4166
> URL: https://issues.apache.org/jira/browse/PIG-4166
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.12.0
> Reporter: Brian Johnson
> Fix For: 0.15.0
>
>
> If the final two keys in each relation join, they will never make it to the
> final output. The reason is that POMergeJoin does a read-ahead and
> POCollectedGroup doesn't call processInput when this.parentPlan.endOfAllInput
> == true. This prevents the final join from being output because POMergeJoin
> never sees endOfAllInput == true.
> {code}
> diff --git
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
>
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> ---
> a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++
> b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
> @Override
> public Result getNextTuple() throws ExecException {
>
> - // Since the output is buffered, we need to flush the last
> - // set of records when the close method is called by mapper.
> - if (this.parentPlan.endOfAllInput) {
> - if (outputBag != null) {
> - Tuple tup = mTupleFactory.newTuple(2);
> - tup.set(0, prevKey);
> - tup.set(1, outputBag);
> - outputBag = null;
> - return new Result(POStatus.STATUS_OK, tup);
> - }
> -
> - return new Result(POStatus.STATUS_EOP, null);
> - }
> +
>
> Result inp = null;
> Result res = null;
>
> while (true) {
> inp = processInput();
> +
> if (inp.returnStatus == POStatus.STATUS_EOP ||
> inp.returnStatus == POStatus.STATUS_ERR) {
> - break;
> + // Since the output is buffered, we need to flush the last
> + // set of records when the close method is called by mapper.
> + if (this.parentPlan.endOfAllInput) {
> + if (outputBag != null) {
> + Tuple tup = mTupleFactory.newTuple(2);
> + tup.set(0, prevKey);
> + tup.set(1, outputBag);
> + outputBag = null;
> + return new Result(POStatus.STATUS_OK, tup);
> + }
> +
> + return new Result(POStatus.STATUS_EOP, null);
> + } else
> + break;
> }
>
> if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)