> On May 12, 2015, 12:14 a.m., kelly zhang wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java,
> >  line 108
> > <https://reviews.apache.org/r/34003/diff/1/?file=953940#file953940line108>
> >
> >     poCollectedGroup.getPlans().get(0) equals poCollectedGroup.parentPlan?

The only reason we are setting parentPlan in CollectGroupCoverter is so that 
the this code in POCollectedGroup does not throw a NullPointerException:

                if (this.parentPlan.endOfAllInput) {
                    return getStreamCloseResult();
                } else {
                    break;
                }

I think poCollectedGroup.getPlans().get(0) was randomly chosen, any physical 
plan with endOfAllInput set would have been ok. 

poCollectedGroip.getPlans().get(0) is in fact an expression plan, not the 
parent plan of POCollectedGroup.

I discovered that we aren't setting the parent plans for the physical operators 
inside SparkPlan to the relevant SparkOperator (parent). I've fixed that using:

                                new PhyPlanSetter(sparkOperator).visit();
                
This way, we can simply set

poCollectedGroup.getParentPlan().endOfAllInput = true

in CollectedGroupConverter.


- Mohit


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/#review83325
-----------------------------------------------------------


On May 8, 2015, 10:26 p.m., Mohit Sabharwal wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34003/
> -----------------------------------------------------------
> 
> (Updated May 8, 2015, 10:26 p.m.)
> 
> 
> Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.
> 
> 
> Bugs: PIG-4542
>     https://issues.apache.org/jira/browse/PIG-4542
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> PIG-4542: OutputConsumerIterator should flush buffered records
> 
> Unnecessary use of RDD.count() was bugging me. This patch addresses that.
> 
> Certain operators may buffer the output. We need to flush the last set of 
> records from such operators, 
> when we encounter the last input record, before calling getNextTuple() for 
> the last time.
> 
> Currently, to flush the last set of records, we compute RDD.count() and 
> compare the count with a 
> running counter to determine if we have reached the last record. This is an 
> unnecessary and inefficient.
> 
> This patch:
> - Gets rid of the use of RDD.count() in CollectedGroupConverter and 
> LimitConverter.
> - Adds an abstract method endInput() which is executed before we call 
> getNextTuple() for the last time.
> - Deletes POStreamSpark since it was just handling the last record.
> - Removed special code in POCollectedGroup to handle the last record.
> - While I was here, renamed POOutputConsumerIterator to 
> OutputConsumerIterator. The "PO" prefix
> should only be usded for physical operators.
> 
> 
> Diffs
> -----
> 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
>  7f81c1dce3304011fe89896dd61d46977bcc8821 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 
> 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
>  99005b7a4eeec5f024512c4aefb396117f574619 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
>  e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
>  3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
>  8b31197f879ed14086a7377635f2877fa3db0e9f 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
>  2c115fc3044c08cc8c96d27a08c9761456aea158 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
>  f42d8ff4653245c9c14ddbea949e66a7609a02a5 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
>  c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
>   test/org/apache/pig/test/TestCollectedGroup.java 
> a958d335a880432145dc80fec0ef2054ce90a3bf 
> 
> Diff: https://reviews.apache.org/r/34003/diff/
> 
> 
> Testing
> -------
> 
> TestStreaming passes.
> No new failures in TestCollectedGroup.
> 
> 
> Thanks,
> 
> Mohit Sabharwal
> 
>

Reply via email to