-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/
-----------------------------------------------------------
Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.
Bugs: PIG-4542
https://issues.apache.org/jira/browse/PIG-4542
Repository: pig-git
Description
-------
PIG-4542: OutputConsumerIterator should flush buffered records
Unnecessary use of RDD.count() was bugging me. This patch addresses that.
Certain operators may buffer the output. We need to flush the last set of
records from such operators,
when we encounter the last input record, before calling getNextTuple() for the
last time.
Currently, to flush the last set of records, we compute RDD.count() and compare
the count with a
running counter to determine if we have reached the last record. This is an
unnecessary and inefficient.
This patch:
- Gets rid of the use of RDD.count() in CollectedGroupConverter and
LimitConverter.
- Adds an abstract method endInput() which is executed before we call
getNextTuple() for the last time.
- Deletes POStreamSpark since it was just handling the last record.
- Removed special code in POCollectedGroup to handle the last record.
- While I was here, renamed POOutputConsumerIterator to OutputConsumerIterator.
The "PO" prefix
should only be usded for physical operators.
Diffs
-----
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
7f81c1dce3304011fe89896dd61d46977bcc8821
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
8654b1b6c6436ed6dd0f24358e5b633e6c963d54
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
99005b7a4eeec5f024512c4aefb396117f574619
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
e976cea44f83e83a3ce28fd03374ede7b52fdeb9
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
8b31197f879ed14086a7377635f2877fa3db0e9f
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
2c115fc3044c08cc8c96d27a08c9761456aea158
src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
f42d8ff4653245c9c14ddbea949e66a7609a02a5
src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
c66fe7b154f18ccc3aa0f5dab31a5952947fdca2
test/org/apache/pig/test/TestCollectedGroup.java
a958d335a880432145dc80fec0ef2054ce90a3bf
Diff: https://reviews.apache.org/r/34003/diff/
Testing
-------
TestStreaming passes.
No new failures in TestCollectedGroup.
Thanks,
Mohit Sabharwal