[
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263459#comment-15263459
]
Xianda Ke commented on PIG-4876:
--------------------------------
*The fragility & complexity of sharing a flag between operators*:
hi [~mohitsabharwal], the flag may be reset by input.hasNext() or input.next()
if any proceding operator reach at its end.
{code:title=OutputConsumerIterator.java}
if (result == null) {
// it does not work if called here. input.hasNext() or input.next() may reset
the flag
//beginOfInput();
if (!input.hasNext()) {
done = true;
return;
}
Tuple v1 = input.next();
beginOfInput(); // it seems ok to insert the call here (?), need to be
tested carefully
attach(v1);
}
if (!input.hasNext()) {
endOfInput();
// Another issue:
// in MR mode, flag was set after the last input tuple is consumed by
getNextResult()
// here, the flag was set before the last input is consumed in
getNextResult()
// it doesn't matter for MergeCogroup, CollectedGroup. These operators
work. but it is a problem for the current implemetation of POMergeJoin
// the implemetation of POMergeJoin is tightly coupling with MR
runPipeline.
// even beginOfInput() is called when attaching, we still have to add some
dirty code in getNextResult() for MergeJoin
// it seem that beginOfInput() is not good enough to solve all the
problems.
// it is not so easy if we just move endOfInput() after getNextResult() in
the case of POStatus.STATUS_EOP:
// i have struggled with regression bugs for days.
}
result = getNextResult();
{code}
Summaries:
Option (a): don't touch non-spark code by reseting the flag.
>From my point of view, it is just a workaround. it seems there no need to add
>abstract method beginOfInput().
Well, I will have a try, and do some testing for beginOfInput()
Option (b): add a flag for each operator in spark mode. These flags only work
in spark mode, should not affect the logic in MR mode.
> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Xianda Ke
> Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some
> input records to constitute the result tuples. The last result tuples are
> buffered in the operator. These Operators need a flag to indicate the end of
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the
> buffered tuples in MR mode. But it does not work with OutputConsumeIterator
> in Spark mode.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)