NEUpanning opened a new issue, #10527: URL: https://github.com/apache/incubator-gluten/issues/10527
### Description ### Use Case We have encountered a production use case where the aggregation before `partial(count distinct)` has poor performance because the majority of data has a unique group by keys. But the aggregation of `partial(count distinct)` has significantly smaller cardinality since the group by keys is only a portion. metrics: <img width="421" height="870" alt="Image" src="https://github.com/user-attachments/assets/49d4c34c-33c8-4fbd-9fea-0fe8dee5e5e7" /> gluten plan: ``` : +- ^(14) FlushableHashAggregateTransformer(keys=[brand_id#605L, gender#606, brand_layered#607, mt_back_city_nation_id#608L, mt_back_province_nation_id#609L, spark_grouping_id#604L], functions=[merge_max(brand_name#504), merge_max(brand_cate2_id#505L), merge_max(brand_cate2#506), merge_max(if (spark_grouping_id#604L IN (11,15)) mt_back_city_nation_name#508 else all), merge_max(if (spark_grouping_id#604L IN (19,23)) mt_back_province_nation_name#510 else all), partial_count(distinct user_id#515)], isStreamingAgg=false, output=[brand_id#605L, gender#606, brand_layered#607, mt_back_city_nation_id#608L, mt_back_province_nation_id#609L, spark_grouping_id#604L, max#737, max#739L, max#741, max#743, max#745, count#748L]) : +- ^(14) HashAggregateTransformer(keys=[brand_id#605L, gender#606, brand_layered#607, mt_back_city_nation_id#608L, mt_back_province_nation_id#609L, spark_grouping_id#604L, user_id#515], functions=[merge_max(brand_name#504), merge_max(brand_cate2_id#505L), merge_max(brand_cate2#506), merge_max(if (spark_grouping_id#604L IN (11,15)) mt_back_city_nation_name#508 else all), merge_max(if (spark_grouping_id#604L IN (19,23)) mt_back_province_nation_name#510 else all)], isStreamingAgg=false, output=[brand_id#605L, gender#606, brand_layered#607, mt_back_city_nation_id#608L, mt_back_province_nation_id#609L, spark_grouping_id#604L, user_id#515, max#737, max#739L, max#741, max#743, max#745]) ``` ### Proposed Solution Since Velox "native" supports distinct aggregation we can add a rule to fuse the two Spark operators to a single aggregation with distinct. There would be performance gain because the new aggregation will have a significantly smaller aggregation buffer. Here is an example: Velox plan before this optimization: ``` keys=key final_sum(value) final_count(id, distinct=false) exchange keys=key keys=key merge_sum(value) partial_count(id, distinct=false) keys=id,key merge_sum(value) exchange keys=id,key keys=id,key partial_sum(value) ``` Velox plan after this optimization: ``` keys=key final_sum(value) final_count(id, distinct=false) exchange keys=key keys=key merge_sum(value) partial_count(id, distinct=true) exchange keys=id,key keys=id,key partial_sum(value) ``` Ideally, we can tranform the plan as below, but I think it could be tricky to implement? ``` keys=key final_sum(value) final_count(id, distinct=true) exchange keys=key keys=id,key partial_sum(value) ``` ### Gluten version None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
