alamb commented on issue #4973:
URL: 
https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1593109590

   > For group by high cardinality columns, I will do some POC to support two 
levels of grouping state (PartitionedHashTable).
   
   I thought DataFusion already did two level partitioned hashing but I just 
checked and apparently not. 
   
   For example:
   
   ```sql
   ❯ explain select avg(system_load_average_1m) from '/tmp/example' group by 
uid;
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                   |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: AVG(/tmp/example.system_load_average_1m)       
                                                                                
                   |
   |               |   Aggregate: groupBy=[[/tmp/example.uid]], 
aggr=[[AVG(/tmp/example.system_load_average_1m)]]                               
                                   |
   |               |     TableScan: /tmp/example 
projection=[system_load_average_1m, uid]                                        
                                                  |
   | physical_plan | ProjectionExec: 
expr=[AVG(/tmp/example.system_load_average_1m)@1 as 
AVG(/tmp/example.system_load_average_1m)]                                       
          |
   |               |   AggregateExec: mode=FinalPartitioned, gby=[uid@0 as 
uid], aggr=[AVG(/tmp/example.system_load_average_1m)]                           
                        |
   |               |     CoalesceBatchesExec: target_batch_size=8192            
                                                                                
                   |
   |               |       RepartitionExec: partitioning=Hash([Column { name: 
"uid", index: 0 }], 16), input_partitions=16                                    
                     |
   |               |         RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=2                                                              
                   |
   |               |           AggregateExec: mode=Partial, gby=[uid@1 as uid], 
aggr=[AVG(/tmp/example.system_load_average_1m)]                                 
                   |
   |               |             ParquetExec: file_groups={2 groups: 
[[private/tmp/example/1.parquet], [private/tmp/example/2.parquet]]}, 
projection=[system_load_average_1m, uid] |
   |               |                                                            
                                                                                
                   |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   
   This plan partitions on the group key `uid` (`RepartitionExec: 
partitioning=Hash([Column { name: "uid", index: 0 }], 16)`) **after** the first 
`AggregateExec`.
   
   Repartioning **before** the first AggregateExec is what is needed (so that 
each of the first level hash tables has a distinct set of keys and the hash 
tables can be smaller and more efficient). So instead of 
   
   ```
   AggregateExec: mode=FinalPartitioned
     RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 16)
       RepartitionExec: partitioning=RoundRobinBatch(16), 
         AggregateExec: mode=Partial, gby=[uid@1 as uid], 
          ParquetExec: file_groups=...
   ```
   
   The plan would look something like
   ```
   AggregateExec: mode=FinalPartitioned
     AggregateExec: mode=Partial, gby=[uid@1 as uid], 
       RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 
16) <-- this is done prior to aggregation
          ParquetExec: file_groups=...
   ```
   
   The potential downside is that the hash function needs to be computed on 
more rows and the repartitioning happens prior to the first level aggregation 
which would be a slow down for low cardinality
   
   The more sophisticated version I think would have *3* aggregates
   
   ```
   AggregateExec: mode=FinalPartitioned
     AggregateExec: mode=Partial, gby=[uid@1 as uid],
       RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 
16) 
         AggregateExec: mode=PartialSmall, gby=[uid@1 as uid], <-- this is done 
prior to aggregation to reduce low cardinality inputs. The idea would be that 
the operator would have a small hash table (a few MB?) and dump to the output 
when the table is full
            ParquetExec: file_groups=...
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to