[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15246982#comment-15246982
 ] 

Xianda Ke commented on PIG-4876:
--------------------------------

Let me explain in details:
h6. 1. The Differences  between MR and Spark engine
The movement of tuples through the execution pipeline, is a pull( iterator) 
model. If an operator is asked to produce a tuple, it return a tuple when it is 
finished or return a pause signal(Result.returnStatus) to indicate that it is 
not finished but also not able to produce an output tuple at this time. 
 
In MR mode, an operator pull data from predecessor operator by directly calling 
PhysicalOperator.getNextTuple(). The signals(Result.returnStatus) propagate 
down the chain of operators, the signals were hanlded in 
PigGenericMapbase.runPipeline(). 

In Spark mode,  to support Spark engine, OutputConsumeIterator has to implement 
the iterator interface. OutputConsumeIterator is like a adapter of 
PhysicalOperator .  Like PigGenericMapbase.runPipeline(),  
OutputConsumeIterator has to handle these signals(Result.returnStatus).  
OutputConsumeIterator won't directly call predecessor's getNextTuple() to get 
the tuple from predecesso, it will use predecessor's iterator to get the tuple 
from predecessor and then attach the tuple for processing.  
OutputConsumeIterator will also buffer a result tuple when hasNext() is called. 

h6. 2. Why does endOfInput () of OutputConsumeIterator NOT work currently?
The flag endOfAllInput is placed in PhysicalPlan and is shared by all the 
operators in the same Plan. In MR mode, pig will run the pipeline one more 
time.  Currently, the implementation of the operators( such as Stream, 
CollectedGroup, etc) use returnStatus of processInp() and the shared flag 
endOfAllInput as conditions to flush the buffered tuples. These 
implementationis tightly coupling with this mechanism in MR mode. It works in 
MR mode.
However, in Spark mode, when the shared flag endOfAllInput is set as true by 
predecessor operators,  but current operator maybe still has input records. At 
this time, OutputConsumeIterator may find that output tuples of current 
operator is not finished, and try to call operator's getNextTuple() again.  
operator's getNextTuple() think it has been at the end of its input. Oops...

h6. 3. How to solve it?
When a predecessor operators reach at the end of its input. Because predecessor 
 operator buffer some output tuples, the successors don't reach at the end of 
its input. Semantically, it is suitable place a flag in each Operator. 
We don't reuse the shared flag endOfAllInput, which works only in MR mode. If 
reuse endOfAllInput in Spark mode, it would be very tricky and complex to 
handle the resultStatus signals in OutputConsumeIterator.  We add a flag to 
indicates the end of its own input for some Operators. It  make life easier and 
the code look more intuitive.

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>
>                 Key: PIG-4876
>                 URL: https://issues.apache.org/jira/browse/PIG-4876
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Xianda Ke
>            Assignee: Xianda Ke
>             Fix For: spark-branch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to