alamb opened a new pull request, #6800:
URL: https://github.com/apache/arrow-datafusion/pull/6800

   # Which issue does this PR close?
   Related to https://github.com/apache/arrow-datafusion/issues/4973
   
   This PR contains a technical spike / proof of concept of the hash aggregate 
approach described in 
https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1608068287
   
   I do not intend to ever merge this PR, but rather if it proves promising, I 
will break it up and incrementally merge it into the existing code (steps TBD)
   
   # Rationale for this change
   We want faster grouping behavior, especially when there are large numbers of 
distinct groups
   
   # What changes are included in this PR?
   1. A new `GroupedHashAggregateStream2` operator that implements vectorized / 
multi-group updates
   2. A new `GroupsAccumulator` trait with a proposed vectorized API for 
managing and updating group state
   
   
   # Performance results:
   
   ## Methodology
   Run this command
   ```shell
   cargo run --profile release-nonlto --bin tpch -- benchmark datafusion 
--iterations 5 -m --format parquet -q 17 --path 
/Users/alamb/Software/arrow-datafusion/benchmarks/data/
   ```
   
   Query:
   ```sql
   select
           sum(l_extendedprice) / 7.0 as avg_yearly
   from
       lineitem,
       part
   where
           p_partkey = l_partkey
     and p_brand = 'Brand#23'
     and p_container = 'MED BOX'
     and l_quantity < (
       select
               0.2 * avg(l_quantity)
       from
           lineitem
       where
               l_partkey = p_partkey
   ```
   
   Here is the original plan:
   
   ```
   [2023-06-29T13:26:41Z DEBUG datafusion::physical_planner] Optimized physical 
plan:
       ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0 AS Float64) / 
7 as avg_yearly]
         AggregateExec: mode=Final, gby=[], aggr=[SUM(lineitem.l_extendedprice)]
           CoalescePartitionsExec
             AggregateExec: mode=Partial, gby=[], 
aggr=[SUM(lineitem.l_extendedprice)]
               ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice]
                 CoalesceBatchesExec: target_batch_size=8192
                   HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 
15)) < Float64(0.2) * AVG(lineitem.l_quantity)@1
                     CoalesceBatchesExec: target_batch_size=8192
                       RepartitionExec: partitioning=Hash([p_partkey@2], 2), 
input_partitions=2
                         ProjectionExec: expr=[l_quantity@1 as l_quantity, 
l_extendedprice@2 as l_extendedprice, p_partkey@3 as p_partkey]
                           CoalesceBatchesExec: target_batch_size=8192
                             HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(l_partkey@0, p_partkey@0)]
                               CoalesceBatchesExec: target_batch_size=8192
                                 RepartitionExec: 
partitioning=Hash([l_partkey@0], 2), input_partitions=2
                                   MemoryExec: partitions=2, 
partition_sizes=[367, 366]
                               CoalesceBatchesExec: target_batch_size=8192
                                 RepartitionExec: 
partitioning=Hash([p_partkey@0], 2), input_partitions=2
                                   ProjectionExec: expr=[p_partkey@0 as 
p_partkey]
                                     CoalesceBatchesExec: target_batch_size=8192
                                       FilterExec: p_brand@1 = Brand#23 AND 
p_container@2 = MED BOX
                                         MemoryExec: partitions=2, 
partition_sizes=[13, 12]
                     ProjectionExec: expr=[CAST(0.2 * 
CAST(AVG(lineitem.l_quantity)@1 AS Float64) AS Decimal128(30, 15)) as 
Float64(0.2) * AVG(lineitem.l_quantity), l_partkey@0 as l_partkey]
                       AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 
as l_partkey], aggr=[AVG(lineitem.l_quantity)]   <-- want to use the new stream 
for here
                         CoalesceBatchesExec: target_batch_size=8192
                           RepartitionExec: partitioning=Hash([l_partkey@0], 
2), input_partitions=2
                             AggregateExec: mode=Partial, gby=[l_partkey@0 as 
l_partkey], aggr=[AVG(lineitem.l_quantity)]
                               MemoryExec: partitions=2, partition_sizes=[367, 
366]
   ```
   
   ## Results
   TODO
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to