[
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263397#comment-15263397
]
liyunzhang_intel commented on PIG-4876:
---------------------------------------
[~mohitsabharwal] and [~kexianda]:
The root cause of this jira is getParentPlan().endOfAllInput is not equal to
endOfAllInput of current physicalOperator. We can only use
OutputConsumeIterator#input.hasNext() to judge the whether it reaches to the
end of current physical operator. So it does not work to add an abstract
function like your suggestion because we don't use input.hasNext() to set the
flag. In PIG-4876.patch, we add a new flag(endOfInput) for POCollectedGroup
and when input.hasNext() is true, isEndOfInput is set as true. In
PIG-4842-2.patch, we use input.hasNext() in
CollectedGroupConverter to set the value of getParentPlan().endOfAllInput.
{code}
@Override
protected Result getNextResult() throws ExecException {
// if endOfAllInput was set as true by the
predecessors, but input.hasNext() is true.
// it means that the predecessor has consumed all
of its input,
// but poCollectedGroup still hasn't consumed all
of its input.
//
// set endOfAllInput as false here, so that
POCollectedGroup.getNextTuple() can work correctly
if (poCollectedGroup.getParentPlan().endOfAllInput
&& input.hasNext()) { // Kelly's comment: here we also use input.hasNext() to
verify whether it reaches to the end
poCollectedGroup.getParentPlan().endOfAllInput
= false;
}
return poCollectedGroup.getNextTuple();
}
{code}
> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>
> Key: PIG-4876
> URL: https://issues.apache.org/jira/browse/PIG-4876
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Xianda Ke
> Assignee: Xianda Ke
> Fix For: spark-branch
>
> Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some
> input records to constitute the result tuples. The last result tuples are
> buffered in the operator. These Operators need a flag to indicate the end of
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the
> buffered tuples in MR mode. But it does not work with OutputConsumeIterator
> in Spark mode.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)