Remi Catherinot created PIG-4612:
------------------------------------

             Summary: accumulating upon filters is still accumulating
                 Key: PIG-4612
                 URL: https://issues.apache.org/jira/browse/PIG-4612
             Project: Pig
          Issue Type: Improvement
          Components: internal-udfs
    Affects Versions: 0.15.0
         Environment: I use yarn not tez nor spark, but i think the problem 
also exists in those environments
            Reporter: Remi Catherinot


Accumulator are not used when accumulating filter results. Here is a script 
with no filters which end-up having a map-combine-reduce plan which efficiently 
use Accumulator design.
A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
B = FOREACH (GROUP A BY (a)) {
   GENERATE MAX(A.b) AS accumulated;
}

If i put a filter and MAX upon it, I end-up with a map-reduce plan (no combine) 
which first generate whole bags as the filtered elements then feed those bags 
to the reducers, that requires more memory, so more spills are needed which 
consumme IO, and also more CPU is needed to handle all this.

A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
B = FOREACH (GROUP A BY (a)) {
   C = FILTER A BY c == 'toto';
   GENERATE MAX(C.b) AS not_accumulated_just_reduced;
}

In my production environnement, i have some jobs that take hours to run, with 
memory hungry containers and still do a lot of spill-to-disk. If i hack in to 
push the filter into the max accumulator, then the job is finished in 5 to 10 
minutes. I think it is possible to develop a PlanOptimizer than would rewritte 
the 2nd script to something like this in a generic way :

A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
B = FOREACH (GROUP A BY (a)) {
   GENERATE filter_when_c_equals_toto_MAX(A.b) AS accumulated;
}

the filtered accumulator will be an accumulator itself and wrap any other 
accumulator, forwarding value to be accumulated only if the filtered eval 
function is true.

This idea can also work on distinct and co, filtered accumulator can wrap each 
other in the layered way.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to