alamb opened a new pull request, #6800: URL: https://github.com/apache/arrow-datafusion/pull/6800
# Which issue does this PR close? Related to https://github.com/apache/arrow-datafusion/issues/4973 This PR contains a technical spike / proof of concept of the hash aggregate approach described in https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1608068287 I do not intend to ever merge this PR, but rather if it proves promising, I will break it up and incrementally merge it into the existing code (steps TBD) # Rationale for this change We want faster grouping behavior, especially when there are large numbers of distinct groups # What changes are included in this PR? 1. A new `GroupedHashAggregateStream2` operator that implements vectorized / multi-group updates 2. A new `GroupsAccumulator` trait with a proposed vectorized API for managing and updating group state # Performance results: ## Methodology Run this command ```shell cargo run --profile release-nonlto --bin tpch -- benchmark datafusion --iterations 5 -m --format parquet -q 17 --path /Users/alamb/Software/arrow-datafusion/benchmarks/data/ ``` Query: ```sql select sum(l_extendedprice) / 7.0 as avg_yearly from lineitem, part where p_partkey = l_partkey and p_brand = 'Brand#23' and p_container = 'MED BOX' and l_quantity < ( select 0.2 * avg(l_quantity) from lineitem where l_partkey = p_partkey ``` Here is the original plan: ``` [2023-06-29T13:26:41Z DEBUG datafusion::physical_planner] Optimized physical plan: ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly] AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)] CoalescePartitionsExec AggregateExec: mode=Partial, gby=[], aggr=[SUM(lineitem.l_extendedprice)] ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice] CoalesceBatchesExec: target_batch_size=8192 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 15)) < Float64(0.2) * AVG(lineitem.l_quantity)@1 CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([p_partkey@2], 2), input_partitions=2 ProjectionExec: expr=[l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice, p_partkey@3 as p_partkey] CoalesceBatchesExec: target_batch_size=8192 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)] CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([l_partkey@0], 2), input_partitions=2 MemoryExec: partitions=2, partition_sizes=[367, 366] CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([p_partkey@0], 2), input_partitions=2 ProjectionExec: expr=[p_partkey@0 as p_partkey] CoalesceBatchesExec: target_batch_size=8192 FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX MemoryExec: partitions=2, partition_sizes=[13, 12] ProjectionExec: expr=[CAST(0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) AS Decimal128(30, 15)) as Float64(0.2) * AVG(lineitem.l_quantity), l_partkey@0 as l_partkey] AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] <-- want to use the new stream for here CoalesceBatchesExec: target_batch_size=8192 RepartitionExec: partitioning=Hash([l_partkey@0], 2), input_partitions=2 AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] MemoryExec: partitions=2, partition_sizes=[367, 366] ``` ## Results TODO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
