[
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263299#comment-15263299
]
Mohit Sabharwal commented on PIG-4876:
--------------------------------------
It's not clear to me why adding beginOfInput() is complex or less readable.
In OutputConsumerIterator, add a *beginOfInput()* abstract method:
{code:title=OutputConsumerIterator.java|borderStyle=solid}
abstract protected void beginOfInput();
{code}
In OutputConsumerIterator.readNext(), insert *beginOfInput()* as shown below:
{code:title=OutputConsumerIterator.java|borderStyle=solid}
...
if (result == null) {
beginOfInput(); // INSERT THIS CALL
if (!input.hasNext()) {
done = true;
return;
}
Tuple v1 = input.next();
attach(v1);
}
...
{code}
Now, in every operator, where we have implemented endOfInput(), also implement
beginOfInput().
For example, in CollectedGroupConverter we have implemented endOfInput(). We
implemented beginOfInput() as:
{code:title=CollectedGroupConverter.java|borderStyle=solid}
@Override
protected void beginOfInput() {
poCollectedGroup.getParentPlan().endOfAllInput =
false;
}
{code}
Maybe I'm misunderstanding this ?
> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Xianda Ke
> Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some
> input records to constitute the result tuples. The last result tuples are
> buffered in the operator. These Operators need a flag to indicate the end of
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the
> buffered tuples in MR mode. But it does not work with OutputConsumeIterator
> in Spark mode.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)