alamb commented on issue #4973:
URL:
https://github.com/apache/arrow-datafusion/issues/4973#issuecomment-1593109590
> For group by high cardinality columns, I will do some POC to support two
levels of grouping state (PartitionedHashTable).
I thought DataFusion already did two level partitioned hashing but I just
checked and apparently not.
For example:
```sql
❯ explain select avg(system_load_average_1m) from '/tmp/example' group by
uid;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: AVG(/tmp/example.system_load_average_1m)
|
| | Aggregate: groupBy=[[/tmp/example.uid]],
aggr=[[AVG(/tmp/example.system_load_average_1m)]]
|
| | TableScan: /tmp/example
projection=[system_load_average_1m, uid]
|
| physical_plan | ProjectionExec:
expr=[AVG(/tmp/example.system_load_average_1m)@1 as
AVG(/tmp/example.system_load_average_1m)]
|
| | AggregateExec: mode=FinalPartitioned, gby=[uid@0 as
uid], aggr=[AVG(/tmp/example.system_load_average_1m)]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([Column { name:
"uid", index: 0 }], 16), input_partitions=16
|
| | RepartitionExec: partitioning=RoundRobinBatch(16),
input_partitions=2
|
| | AggregateExec: mode=Partial, gby=[uid@1 as uid],
aggr=[AVG(/tmp/example.system_load_average_1m)]
|
| | ParquetExec: file_groups={2 groups:
[[private/tmp/example/1.parquet], [private/tmp/example/2.parquet]]},
projection=[system_load_average_1m, uid] |
| |
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
This plan partitions on the group key `uid` (`RepartitionExec:
partitioning=Hash([Column { name: "uid", index: 0 }], 16)`) **after** the first
`AggregateExec`.
Repartioning **before** the first AggregateExec is what is needed (so that
each of the first level hash tables has a distinct set of keys and the hash
tables can be smaller and more efficient). So instead of
```
AggregateExec: mode=FinalPartitioned
RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }], 16)
RepartitionExec: partitioning=RoundRobinBatch(16),
AggregateExec: mode=Partial, gby=[uid@1 as uid],
ParquetExec: file_groups=...
```
The plan would look something like
```
AggregateExec: mode=FinalPartitioned
AggregateExec: mode=Partial, gby=[uid@1 as uid],
RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }],
16) <-- this is done prior to aggregation
ParquetExec: file_groups=...
```
The potential downside is that the hash function needs to be computed on
more rows and the repartitioning happens prior to the first level aggregation
which would be a slow down for low cardinality
The more sophisticated version I think would have *3* aggregates
```
AggregateExec: mode=FinalPartitioned
AggregateExec: mode=Partial, gby=[uid@1 as uid],
RepartitionExec: partitioning=Hash([Column { name: "uid", index: 0 }],
16)
AggregateExec: mode=PartialSmall, gby=[uid@1 as uid], <-- this is done
prior to aggregation to reduce low cardinality inputs. The idea would be that
the operator would have a small hash table (a few MB?) and dump to the output
when the table is full
ParquetExec: file_groups=...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]