> On May 12, 2015, 12:14 a.m., kelly zhang wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java,
> > line 108
> > <https://reviews.apache.org/r/34003/diff/1/?file=953940#file953940line108>
> >
> > poCollectedGroup.getPlans().get(0) equals poCollectedGroup.parentPlan?
The only reason we are setting parentPlan in CollectGroupCoverter is so that
the this code in POCollectedGroup does not throw a NullPointerException:
if (this.parentPlan.endOfAllInput) {
return getStreamCloseResult();
} else {
break;
}
I think poCollectedGroup.getPlans().get(0) was randomly chosen, any physical
plan with endOfAllInput set would have been ok.
poCollectedGroip.getPlans().get(0) is in fact an expression plan, not the
parent plan of POCollectedGroup.
I discovered that we aren't setting the parent plans for the physical operators
inside SparkPlan to the relevant SparkOperator (parent). I've fixed that using:
new PhyPlanSetter(sparkOperator).visit();
This way, we can simply set
poCollectedGroup.getParentPlan().endOfAllInput = true
in CollectedGroupConverter.
- Mohit
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34003/#review83325
-----------------------------------------------------------
On May 8, 2015, 10:26 p.m., Mohit Sabharwal wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34003/
> -----------------------------------------------------------
>
> (Updated May 8, 2015, 10:26 p.m.)
>
>
> Review request for pig, liyun zhang, Praveen Rachabattuni, and Xuefu Zhang.
>
>
> Bugs: PIG-4542
> https://issues.apache.org/jira/browse/PIG-4542
>
>
> Repository: pig-git
>
>
> Description
> -------
>
> PIG-4542: OutputConsumerIterator should flush buffered records
>
> Unnecessary use of RDD.count() was bugging me. This patch addresses that.
>
> Certain operators may buffer the output. We need to flush the last set of
> records from such operators,
> when we encounter the last input record, before calling getNextTuple() for
> the last time.
>
> Currently, to flush the last set of records, we compute RDD.count() and
> compare the count with a
> running counter to determine if we have reached the last record. This is an
> unnecessary and inefficient.
>
> This patch:
> - Gets rid of the use of RDD.count() in CollectedGroupConverter and
> LimitConverter.
> - Adds an abstract method endInput() which is executed before we call
> getNextTuple() for the last time.
> - Deletes POStreamSpark since it was just handling the last record.
> - Removed special code in POCollectedGroup to handle the last record.
> - While I was here, renamed POOutputConsumerIterator to
> OutputConsumerIterator. The "PO" prefix
> should only be usded for physical operators.
>
>
> Diffs
> -----
>
>
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> 7f81c1dce3304011fe89896dd61d46977bcc8821
> src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
> 8654b1b6c6436ed6dd0f24358e5b633e6c963d54
>
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
> 99005b7a4eeec5f024512c4aefb396117f574619
>
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
> e976cea44f83e83a3ce28fd03374ede7b52fdeb9
>
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
> 3074f5e6ea4ac15f38affcdaa3e23c3d66bdc104
>
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/POOutputConsumerIterator.java
> 8b31197f879ed14086a7377635f2877fa3db0e9f
>
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
> 2c115fc3044c08cc8c96d27a08c9761456aea158
>
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POStreamSpark.java
> f42d8ff4653245c9c14ddbea949e66a7609a02a5
>
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
> c66fe7b154f18ccc3aa0f5dab31a5952947fdca2
> test/org/apache/pig/test/TestCollectedGroup.java
> a958d335a880432145dc80fec0ef2054ce90a3bf
>
> Diff: https://reviews.apache.org/r/34003/diff/
>
>
> Testing
> -------
>
> TestStreaming passes.
> No new failures in TestCollectedGroup.
>
>
> Thanks,
>
> Mohit Sabharwal
>
>