ravlio opened a new issue, #9011:
URL: https://github.com/apache/arrow-datafusion/issues/9011
### Describe the bug
Hi everyone!
While developing my project on top of DataFusion I faced with unexpected
physical plan after the optimizer was applied. I have a `PartitionedAggregate`
node that has `required_input_distribution` set to
`Distribution::HashPartitioned` on columns `project_id` and `user_id` This
doesn't help, optimizer does `RepartitionExec: partitioning=RoundRobinBatch`.
Moreover, I tried to add the `Repartition` node to the logical plan but it was
erased by the optimizer.
Additionally, projection doesn't push down and sort disappears.
### To Reproduce
My logical plan:
```
TableScan: ?table? projection=[project_id, user_id, created_at, event_id,
event, str_0, str_1]
PartitionedAggregate: ...
Filter: project_id = Int64(1) AND created_at >=
TimestampNanosecond(1705419428144118000, None) AND created_at <=
TimestampNanosecond(1706283428144118000, None) AND event = UInt16(13)
Sort: project_id ASC NULLS LAST, user_id ASC NULLS LAST
Repartition: Hash(project_id, user_id) partition_count=12
Projection: project_id, user_id, created_at, event
TableScan: ?table? projection=[project_id, user_id, created_at,
event_id, event, str_0]]
```
physical plan:
```
PartitionedAggregateExec, metrics=[]
CoalesceBatchesExec: target_batch_size=8192, metrics=[]
RepartitionExec: partitioning=Hash([project_id@0, user_id@1], 12),
input_partitions=12, metrics=[]
RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1,
metrics=[]
CoalescePartitionsExec, metrics=[]
ProjectionExec: expr=[project_id@0 as project_id, user_id@1 as
user_id, created_at@2 as created_at, event@4 as event], metrics=[]
CoalesceBatchesExec: target_batch_size=8192, metrics=[]
FilterExec: project_id@0 = 1 AND created_at@2 >=
1705419428144118000 AND created_at@2 <= 1706283428144118000 AND event@4 = 13,
metrics=[]
RepartitionExec: partitioning=RoundRobinBatch(12),
input_partitions=1, metrics=[]
ParquetExec: file_groups={1 group:
[[opt/homebrew/Caskroom/clickhouse/user_files/store/tables/events/0/0.parquet]]},
projection=[project_id, user_id, created_at, event_id, event, str_0],
predicate=project_id@0 = 1 AND created_at@2 >= 1705419428144118000 AND
created_at@2 <= 1706283428144118000 AND event@4 = 13,
pruning_predicate=project_id_min@0 <= 1 AND 1 <= project_id_max@1 AND
created_at_max@2 >= 1705419428144118000 AND created_at_min@3 <=
1706283428144118000 AND event_min@4 <= 13 AND 13 <= event_max@5,
metrics=[num_predicate_creation_errors=0]```
A lot of repartitions here
### Expected behavior
I expected something like that. Sorry, it might contain mistakes since I
wrote it by hand:
```
PartitionedAggregateExec, metrics=[]
CoalesceBatchesExec: target_batch_size=8192, metrics=[]
CoalesceBatchesExec: target_batch_size=8192, metrics=[]
SortExec:
RepartitionExec: partitioning=Hash([project_id@0, user_id@1], 12),
input_partitions=12, metrics=[]
FilterExec: project_id@0 = 1 AND created_at@2 >=
1705419428144118000 AND created_at@2 <= 1706283428144118000 AND event@4 = 13,
metrics=[]
ParquetExec: file_groups={1 group:
[[opt/homebrew/Caskroom/clickhouse/user_files/store/tables/events/0/0.parquet]]},
projection=[project_id, user_id, created_at, event_id, event],
predicate=project_id@0 = 1 AND created_at@2 >= 1705419428144118000 AND
created_at@2 <= 1706283428144118000 AND event@4 = 13,
pruning_predicate=project_id_min@0 <= 1 AND 1 <= project_id_max@1 AND
created_at_max@2 >= 1705419428144118000 AND created_at_min@3 <=
1706283428144118000 AND event_min@4 <= 13 AND 13 <= event_max@5,
metrics=[num_predicate_creation_errors=0]
```
E.g. `read certain cols and filter from parquet -> partition by hash -> sort
-> my node`
### Additional context
BTW, can I do `explain verbose`, from the Rust code?
Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]