rgehan commented on issue #9898:
URL: https://github.com/apache/datafusion/issues/9898#issuecomment-3457379821
Is there any plan on moving forward with the fix suggested in #9867?
We've got a use case where we need to merge multiple similarly dimensioned,
and mostly sorted, Parquet files.
To achieve this, we leverage unions, add explicit sorts on the few files
that are not sorted, then aggregate.
We want to sort near the data, so that subsequent aggregation can leverage
`SortPreservingMergeExec`, minimizing memory usage.
While the initial plan only sorts the unsorted files, before the union
happens, the optimized plan would cause the entire result set to be resorted,
increasing memory usage/spilling.
I'm testing with `target_partitions: 1`, `repartition_joins: false`.
### Before optimization
```
OutputRequirementExec: order_by=[], dist_by=Unspecified
AggregateExec: mode=Final, gby=[pk1@0 as pk1, pk2@1 as pk2], aggr=[],
ordering_mode=Sorted
AggregateExec: mode=Partial, gby=[pk1@0 as pk1, pk2@1 as pk2], aggr=[],
ordering_mode=Sorted
UnionExec
SortExec: expr=[pk1@0 ASC, pk2@1 ASC], preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[file1.parquet]]},
projection=[pk1, pk2], file_type=parquet
DataSourceExec: file_groups={1 group: [[file2.parquet]]},
projection=[pk1, pk2], output_ordering=[pk1@0 ASC, pk2@1 ASC], file_type=parquet
DataSourceExec: file_groups={1 group: [[file3.parquet]]},
projection=[pk1, pk2], output_ordering=[pk1@0 ASC, pk2@1 ASC], file_type=parquet
```
### Actual plan, after optimization
* Sort below the union is removed
* Sort is lifted higher up
* We don't use `SortPreservingMergeExec`
```
OutputRequirementExec: order_by=[], dist_by=Unspecified
AggregateExec: mode=Final, gby=[pk1@0 as pk1, pk2@1 as pk2], aggr=[],
ordering_mode=Sorted
SortExec: expr=[pk1@0 ASC NULLS LAST, pk2@1 ASC NULLS LAST],
preserve_partitioning=[false]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[pk1@0 as pk1, pk2@1 as pk2],
aggr=[]
UnionExec
DataSourceExec: file_groups={1 group: [[file1.parquet]]},
projection=[pk1, pk2], file_type=parquet
DataSourceExec: file_groups={1 group: [[file2.parquet]]},
projection=[pk1, pk2], output_ordering=[pk1@0 ASC, pk2@1 ASC], file_type=parquet
DataSourceExec: file_groups={1 group: [[file3.parquet]]},
projection=[pk1, pk2], output_ordering=[pk1@0 ASC, pk2@1 ASC], file_type=parquet
```
### Expected plan, after optimization
* Sort below the union is preserved
* No top-level sort is added
* We use `SortPreservingMergeExec`
```
OutputRequirementExec: order_by=[], dist_by=Unspecified
AggregateExec: mode=Final, gby=[pk1@0 as pk1, pk2@1 as pk2], aggr=[],
ordering_mode=Sorted
SortPreservingMergeExec: [pk1@0 ASC, pk2@1 ASC]
AggregateExec: mode=Partial, gby=[pk1@0 as pk1, pk2@1 as pk2],
aggr=[], ordering_mode=Sorted
UnionExec
SortExec: expr=[pk1@0 ASC, pk2@1 ASC],
preserve_partitioning=[false]
DataSourceExec: file_groups={1 group: [[file1.parquet]]},
projection=[pk1, pk2], output_ordering=[pk1@0 ASC, pk2@1 ASC], file_type=parquet
DataSourceExec: file_groups={1 group: [[file2.parquet]]},
projection=[pk1, pk2], output_ordering=[pk1@0 ASC, pk2@1 ASC], file_type=parquet
DataSourceExec: file_groups={1 group: [[file3.parquet]]},
projection=[pk1, pk2], output_ordering=[pk1@0 ASC, pk2@1 ASC], file_type=parquet
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]