[ 
https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263299#comment-15263299
 ] 

Mohit Sabharwal commented on PIG-4876:
--------------------------------------

It's not clear to me why adding beginOfInput() is complex or less readable.

In OutputConsumerIterator, add a *beginOfInput()* abstract method:
{code:title=OutputConsumerIterator.java|borderStyle=solid}
    abstract protected void beginOfInput();
{code}

In OutputConsumerIterator.readNext(), insert *beginOfInput()* as shown below:
{code:title=OutputConsumerIterator.java|borderStyle=solid}
   ...
                   if (result == null) {
                      beginOfInput();   // INSERT THIS CALL
                      if (!input.hasNext()) {
                        done = true;
                        return;
                      }
                      Tuple v1 = input.next();
                      attach(v1);
                }
...
{code}

Now, in every operator, where we have implemented endOfInput(), also implement 
beginOfInput().

For example, in CollectedGroupConverter we have implemented endOfInput(). We 
implemented beginOfInput() as:
{code:title=CollectedGroupConverter.java|borderStyle=solid}
                        @Override
                        protected void beginOfInput() {
                            poCollectedGroup.getParentPlan().endOfAllInput = 
false;
                        }
{code}


Maybe I'm misunderstanding this ?

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>
>                 Key: PIG-4876
>                 URL: https://issues.apache.org/jira/browse/PIG-4876
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Xianda Ke
>            Assignee: Xianda Ke
>             Fix For: spark-branch
>
>         Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some 
> input records to constitute the result tuples. The last result tuples are 
> buffered in the operator.  These Operators need a flag to indicate the end of 
> input, so that they can flush and constitute their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the 
> buffered tuples in MR mode.  But it does not work with OutputConsumeIterator 
> in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to