[ 
https://issues.apache.org/jira/browse/PIG-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14605623#comment-14605623
 ] 

Remi Catherinot commented on PIG-4612:
--------------------------------------

Ok. I've started reading/running and debugging the CombinerOptimizer. There is 
2 CombinerOptimizer class. A mapReduce one, and another in a tez sub package. I 
followed the code up to 
org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil. As 
far as i understand, even if there is 2 CombinerOptimizer versions, both use 
the same utility class to unlock the optimisation.

I still do have one question, is it legit to call the final result of an 
algebraic user function even if neither the initial nor the intermediate 
methods were ever called ? I think that's the case but a confirmation would be 
good.

> accumulating upon filters is still accumulating
> -----------------------------------------------
>
>                 Key: PIG-4612
>                 URL: https://issues.apache.org/jira/browse/PIG-4612
>             Project: Pig
>          Issue Type: Improvement
>          Components: internal-udfs
>    Affects Versions: 0.15.0
>         Environment: I use yarn not tez nor spark, but i think the problem 
> also exists in those environments
>            Reporter: Remi Catherinot
>              Labels: performance
>
> Accumulator are not used when accumulating filter results. Here is a script 
> with no filters which end-up having a map-combine-reduce plan which 
> efficiently use Accumulator design.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
>    GENERATE MAX(A.b) AS accumulated;
> }
> If i put a filter and MAX upon it, I end-up with a map-reduce plan (no 
> combine) which first generate whole bags as the filtered elements then feed 
> those bags to the reducers, that requires more memory, so more spills are 
> needed which consumme IO, and also more CPU is needed to handle all this.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
>    C = FILTER A BY c == 'toto';
>    GENERATE MAX(C.b) AS not_accumulated_just_reduced;
> }
> In my production environnement, i have some jobs that take hours to run, with 
> memory hungry containers and still do a lot of spill-to-disk. If i hack in to 
> push the filter into the max accumulator, then the job is finished in 5 to 10 
> minutes. I think it is possible to develop a PlanOptimizer than would 
> rewritte the 2nd script to something like this in a generic way :
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
>    GENERATE filter_when_c_equals_toto_MAX(A.b) AS accumulated;
> }
> the filtered accumulator will be an accumulator itself and wrap any other 
> accumulator, forwarding value to be accumulated only if the filtered eval 
> function is true.
> This idea can also work on distinct and co, filtered accumulator can wrap 
> each other in the layered way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to