-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/#review83325
-----------------------------------------------------------



src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
<https://reviews.apache.org/r/34003/#comment134271>

    poCollectedGroup.getPlans().get(0) equals poCollectedGroup.parentPlan?


- kelly zhang


On May 8, 2015, 10:26 p.m., Mohit Sabharwal wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34003/
> -----------------------------------------------------------
> 
> (Updated May 8, 2015, 10:26 p.m.)
> 
> 
> Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.
> 
> 
> Bugs: PIG-4542
>     https://issues.apache.org/jira/browse/PIG-4542
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> PIG-4542: OutputConsumerIterator should flush buffered records
> 
> Unnecessary use of RDD.count() was bugging me. This patch addresses that.
> 
> Certain operators may buffer the output. We need to flush the last set of 
> records from such operators, 
> when we encounter the last input record, before calling getNextTuple() for 
> the last time.
> 
> Currently, to flush the last set of records, we compute RDD.count() and 
> compare the count with a 
> running counter to determine if we have reached the last record. This is an 
> unnecessary and inefficient.
> 
> This patch:
> - Gets rid of the use of RDD.count() in CollectedGroupConverter and 
> LimitConverter.
> - Adds an abstract method endInput() which is executed before we call 
> getNextTuple() for the last time.
> - Deletes POStreamSpark since it was just handling the last record.
> - Removed special code in POCollectedGroup to handle the last record.
> - While I was here, renamed POOutputConsumerIterator to 
> OutputConsumerIterator. The "PO" prefix
> should only be usded for physical operators.
> 
> 
> Diffs
> -----
> 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
>  7f81c1dce3304011fe89896dd61d46977bcc8821 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 
> 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
>  99005b7a4eeec5f024512c4aefb396117f574619 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
>  e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
>  3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
>  8b31197f879ed14086a7377635f2877fa3db0e9f 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
>  2c115fc3044c08cc8c96d27a08c9761456aea158 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
>  f42d8ff4653245c9c14ddbea949e66a7609a02a5 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
>  c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
>   test/org/apache/pig/test/TestCollectedGroup.java 
> a958d335a880432145dc80fec0ef2054ce90a3bf 
> 
> Diff: https://reviews.apache.org/r/34003/diff/
> 
> 
> Testing
> -------
> 
> TestStreaming passes.
> No new failures in TestCollectedGroup.
> 
> 
> Thanks,
> 
> Mohit Sabharwal
> 
>

Reply via email to