[
https://issues.apache.org/jira/browse/PIG-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14601395#comment-14601395
]
Remi Catherinot commented on PIG-4612:
--------------------------------------
I'm currently working on such a logical plan optimizer. i plan to release it
through the piggy bank. pig user would be able to give a try by enabling it
using PIG-4598
> accumulating upon filters is still accumulating
> -----------------------------------------------
>
> Key: PIG-4612
> URL: https://issues.apache.org/jira/browse/PIG-4612
> Project: Pig
> Issue Type: Improvement
> Components: internal-udfs
> Affects Versions: 0.15.0
> Environment: I use yarn not tez nor spark, but i think the problem
> also exists in those environments
> Reporter: Remi Catherinot
> Labels: performance
>
> Accumulator are not used when accumulating filter results. Here is a script
> with no filters which end-up having a map-combine-reduce plan which
> efficiently use Accumulator design.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> GENERATE MAX(A.b) AS accumulated;
> }
> If i put a filter and MAX upon it, I end-up with a map-reduce plan (no
> combine) which first generate whole bags as the filtered elements then feed
> those bags to the reducers, that requires more memory, so more spills are
> needed which consumme IO, and also more CPU is needed to handle all this.
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> C = FILTER A BY c == 'toto';
> GENERATE MAX(C.b) AS not_accumulated_just_reduced;
> }
> In my production environnement, i have some jobs that take hours to run, with
> memory hungry containers and still do a lot of spill-to-disk. If i hack in to
> push the filter into the max accumulator, then the job is finished in 5 to 10
> minutes. I think it is possible to develop a PlanOptimizer than would
> rewritte the 2nd script to something like this in a generic way :
> A = LOAD '/some/data' AS (a:chararray,b:long,c:chararray);
> B = FOREACH (GROUP A BY (a)) {
> GENERATE filter_when_c_equals_toto_MAX(A.b) AS accumulated;
> }
> the filtered accumulator will be an accumulator itself and wrap any other
> accumulator, forwarding value to be accumulated only if the filtered eval
> function is true.
> This idea can also work on distinct and co, filtered accumulator can wrap
> each other in the layered way.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)