[
https://issues.apache.org/jira/browse/PIG-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14605623#comment-14605623
]
Remi Catherinot commented on PIG-4612:
--------------------------------------
Ok. I've started reading/running and debugging the CombinerOptimizer. There is
2 CombinerOptimizer class. A mapReduce one, and another in a tez sub package. I
followed the code up to
org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil. As
far as i understand, even if there is 2 CombinerOptimizer versions, both use
the same utility class to unlock the optimisation.
I still do have one question, is it legit to call the final result of an
algebraic user function even if neither the initial nor the intermediate
methods were ever called ? I think that's the case but a confirmation would be
good.
> accumulating upon filters is still accumulating
> -----------------------------------------------
>
> Key: PIG-4612
> URL: https://issues.apache.org/jira/browse/PIG-4612
> Project: Pig
> Issue Type: Improvement
> Components: internal-udfs
> Affects Versions: 0.15.0
> Environment: I use yarn not tez nor spark, but i think the problem
> also exists in those environments
> Reporter: Remi Catherinot
> Labels: performance
>
> Accumulator are not used when accumulating filter results. Here is a script
> with no filters which end-up having a map-combine-reduce plan which
> efficiently use Accumulator design.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> GENERATE MAX(A.b) AS accumulated;
> }
> If i put a filter and MAX upon it, I end-up with a map-reduce plan (no
> combine) which first generate whole bags as the filtered elements then feed
> those bags to the reducers, that requires more memory, so more spills are
> needed which consumme IO, and also more CPU is needed to handle all this.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> C = FILTER A BY c == 'toto';
> GENERATE MAX(C.b) AS not_accumulated_just_reduced;
> }
> In my production environnement, i have some jobs that take hours to run, with
> memory hungry containers and still do a lot of spill-to-disk. If i hack in to
> push the filter into the max accumulator, then the job is finished in 5 to 10
> minutes. I think it is possible to develop a PlanOptimizer than would
> rewritte the 2nd script to something like this in a generic way :
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> GENERATE filter_when_c_equals_toto_MAX(A.b) AS accumulated;
> }
> the filtered accumulator will be an accumulator itself and wrap any other
> accumulator, forwarding value to be accumulated only if the filtered eval
> function is true.
> This idea can also work on distinct and co, filtered accumulator can wrap
> each other in the layered way.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)