-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/
-----------------------------------------------------------

(Updated May 12, 2015, 4:21 a.m.)


Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.


Changes
-------

Incorp. review feedback.
 - Added to SparkLauncher: new PhyPlanSetter(leaf.physicalPlan).visit();
 - Fixed parent plan in CollectedGroupConverter


Bugs: PIG-4542
    https://issues.apache.org/jira/browse/PIG-4542


Repository: pig-git


Description
-------

PIG-4542: OutputConsumerIterator should flush buffered records

Unnecessary use of RDD.count() was bugging me. This patch addresses that.

Certain operators may buffer the output. We need to flush the last set of 
records from such operators, 
when we encounter the last input record, before calling getNextTuple() for the 
last time.

Currently, to flush the last set of records, we compute RDD.count() and compare 
the count with a 
running counter to determine if we have reached the last record. This is an 
unnecessary and inefficient.

This patch:
- Gets rid of the use of RDD.count() in CollectedGroupConverter and 
LimitConverter.
- Adds an abstract method endInput() which is executed before we call 
getNextTuple() for the last time.
- Deletes POStreamSpark since it was just handling the last record.
- Removed special code in POCollectedGroup to handle the last record.
- While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator. 
The "PO" prefix
should only be usded for physical operators.


Diffs (updated)
-----

  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 3e75cadbca4d98bfd7aeb49da5f298acdab2bc4d 
  
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
 7f81c1dce3304011fe89896dd61d46977bcc8821 
  src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 
8654b1b6c6436ed6dd0f24358e5b633e6c963d54 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
 99005b7a4eeec5f024512c4aefb396117f574619 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
 e976cea44f83e83a3ce28fd03374ede7b52fdeb9 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
 8b31197f879ed14086a7377635f2877fa3db0e9f 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
 2c115fc3044c08cc8c96d27a08c9761456aea158 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
 f42d8ff4653245c9c14ddbea949e66a7609a02a5 
  
src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java 
c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 
  test/org/apache/pig/test/TestCollectedGroup.java 
a958d335a880432145dc80fec0ef2054ce90a3bf 

Diff: https://reviews.apache.org/r/34003/diff/


Testing
-------

TestStreaming passes.
No new failures in TestCollectedGroup.


Thanks,

Mohit Sabharwal

Reply via email to