----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34003/#review83325 -----------------------------------------------------------
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java <https://reviews.apache.org/r/34003/#comment134271> poCollectedGroup.getPlans().get(0) equals poCollectedGroup.parentPlan? - kelly zhang On May 8, 2015, 10:26 p.m., Mohit Sabharwal wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34003/ > ----------------------------------------------------------- > > (Updated May 8, 2015, 10:26 p.m.) > > > Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang. > > > Bugs: PIG-4542 > https://issues.apache.org/jira/browse/PIG-4542 > > > Repository: pig-git > > > Description > ------- > > PIG-4542: OutputConsumerIterator should flush buffered records > > Unnecessary use of RDD.count() was bugging me. This patch addresses that. > > Certain operators may buffer the output. We need to flush the last set of > records from such operators, > when we encounter the last input record, before calling getNextTuple() for > the last time. > > Currently, to flush the last set of records, we compute RDD.count() and > compare the count with a > running counter to determine if we have reached the last record. This is an > unnecessary and inefficient. > > This patch: > - Gets rid of the use of RDD.count() in CollectedGroupConverter and > LimitConverter. > - Adds an abstract method endInput() which is executed before we call > getNextTuple() for the last time. > - Deletes POStreamSpark since it was just handling the last record. > - Removed special code in POCollectedGroup to handle the last record. > - While I was here, renamed POOutputConsumerIterator to > OutputConsumerIterator. The "PO" prefix > should only be usded for physical operators. > > > Diffs > ----- > > > src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java > 7f81c1dce3304011fe89896dd61d46977bcc8821 > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java > 8654b1b6c6436ed6dd0f24358e5b633e6c963d54 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java > 99005b7a4eeec5f024512c4aefb396117f574619 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java > e976cea44f83e83a3ce28fd03374ede7b52fdeb9 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java > 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java > 8b31197f879ed14086a7377635f2877fa3db0e9f > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java > 2c115fc3044c08cc8c96d27a08c9761456aea158 > > src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java > f42d8ff4653245c9c14ddbea949e66a7609a02a5 > > src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java > c66fe7b154f18ccc3aa0f5dab31a5952947fdca2 > test/org/apache/pig/test/TestCollectedGroup.java > a958d335a880432145dc80fec0ef2054ce90a3bf > > Diff: https://reviews.apache.org/r/34003/diff/ > > > Testing > ------- > > TestStreaming passes. > No new failures in TestCollectedGroup. > > > Thanks, > > Mohit Sabharwal > >
