NEUpanning opened a new issue, #10527:
URL: https://github.com/apache/incubator-gluten/issues/10527

   ### Description
   
   ### Use Case
   We have encountered a production use case where the aggregation before 
`partial(count distinct)` has poor performance because the majority of data has 
a unique group by keys. But the aggregation of `partial(count distinct)` has 
significantly smaller cardinality since the group by keys is only a portion.
   
   metrics:
   
   <img width="421" height="870" alt="Image" 
src="https://github.com/user-attachments/assets/49d4c34c-33c8-4fbd-9fea-0fe8dee5e5e7";
 />
   
   gluten plan:
   ```
            :                    +- ^(14) 
FlushableHashAggregateTransformer(keys=[brand_id#605L, gender#606, 
brand_layered#607, mt_back_city_nation_id#608L, 
mt_back_province_nation_id#609L, spark_grouping_id#604L], 
functions=[merge_max(brand_name#504), merge_max(brand_cate2_id#505L), 
merge_max(brand_cate2#506), merge_max(if (spark_grouping_id#604L IN (11,15)) 
mt_back_city_nation_name#508 else all), merge_max(if (spark_grouping_id#604L IN 
(19,23)) mt_back_province_nation_name#510 else all), partial_count(distinct 
user_id#515)], isStreamingAgg=false, output=[brand_id#605L, gender#606, 
brand_layered#607, mt_back_city_nation_id#608L, 
mt_back_province_nation_id#609L, spark_grouping_id#604L, max#737, max#739L, 
max#741, max#743, max#745, count#748L])
            :                       +- ^(14) 
HashAggregateTransformer(keys=[brand_id#605L, gender#606, brand_layered#607, 
mt_back_city_nation_id#608L, mt_back_province_nation_id#609L, 
spark_grouping_id#604L, user_id#515], functions=[merge_max(brand_name#504), 
merge_max(brand_cate2_id#505L), merge_max(brand_cate2#506), merge_max(if 
(spark_grouping_id#604L IN (11,15)) mt_back_city_nation_name#508 else all), 
merge_max(if (spark_grouping_id#604L IN (19,23)) 
mt_back_province_nation_name#510 else all)], isStreamingAgg=false, 
output=[brand_id#605L, gender#606, brand_layered#607, 
mt_back_city_nation_id#608L, mt_back_province_nation_id#609L, 
spark_grouping_id#604L, user_id#515, max#737, max#739L, max#741, max#743, 
max#745])
   ```
   
   ### Proposed Solution
   Since Velox "native" supports distinct aggregation we can add a rule to fuse 
the two Spark operators to a single aggregation with distinct. There would be 
performance gain because the new aggregation will have a significantly smaller 
aggregation buffer. Here is an example:
   
   
   Velox plan before this optimization:
   ```
   keys=key final_sum(value) final_count(id, distinct=false)
    exchange keys=key
     keys=key merge_sum(value) partial_count(id, distinct=false)
      keys=id,key merge_sum(value)
       exchange keys=id,key
        keys=id,key partial_sum(value) 
   ```
   
   Velox plan after this optimization:
   ```
   keys=key final_sum(value) final_count(id, distinct=false)
    exchange keys=key
     keys=key merge_sum(value) partial_count(id, distinct=true)
       exchange keys=id,key
        keys=id,key partial_sum(value) 
   ```
   
   Ideally, we can tranform the plan as below, but I think it could be tricky 
to implement?
   ```
   keys=key final_sum(value) final_count(id, distinct=true)
    exchange keys=key
        keys=id,key partial_sum(value) 
   ```
   
   ### Gluten version
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to