[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263397#comment-15263397
 ] 

liyunzhang_intel commented on PIG-4876:
---------------------------------------

[~mohitsabharwal] and [~kexianda]:
   The root cause of this jira is getParentPlan().endOfAllInput is not equal to 
endOfAllInput of current physicalOperator. We can only use 
OutputConsumeIterator#input.hasNext() to judge the whether it reaches to the 
end of current physical operator.  So it does not work to add an abstract 
function like your suggestion because we don't use input.hasNext() to set the 
flag.  In PIG-4876.patch, we add a new flag(endOfInput) for POCollectedGroup 
and when input.hasNext() is true,  isEndOfInput is set as true.  In 
PIG-4842-2.patch, we use input.hasNext() in 
CollectedGroupConverter to set the value of getParentPlan().endOfAllInput.
{code}

                  @Override
                        protected Result getNextResult() throws ExecException {

                            // if endOfAllInput was set as true by the 
predecessors, but input.hasNext() is true.
                            // it means that the predecessor has consumed all 
of its input,
                            // but poCollectedGroup still hasn't consumed all 
of its input.
                            //
                            // set endOfAllInput as false here, so that 
POCollectedGroup.getNextTuple() can work correctly
                            if (poCollectedGroup.getParentPlan().endOfAllInput 
&& input.hasNext()) {  // Kelly's comment: here we also use input.hasNext() to 
verify whether it reaches to the end
                                poCollectedGroup.getParentPlan().endOfAllInput 
= false;    
                            }

                            return poCollectedGroup.getNextTuple();
                        }
{code}


> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>
>                 Key: PIG-4876
>                 URL: https://issues.apache.org/jira/browse/PIG-4876
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Xianda Ke
>            Assignee: Xianda Ke
>             Fix For: spark-branch
>
>         Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to