alamb opened a new issue, #8451:
URL: https://github.com/apache/arrow-datafusion/issues/8451
### Describe the bug
We have a case where the `EnforceDistribution` rule has repatitioned a
ParquetExec which parallelized the read (which is good) but that
parallelization resulted in destroying the sort order (as it mixes parts of
different files together in the same partition). The rest of the plan relies on
the output being sorted, and thus since it is no longer sorted we see incorrect
results
### To Reproduce
The input plan looks like this:
```
OutputRequirementExec
ProjectionExec: expr=[tag@1 as tag]
FilterExec: CAST(field@0 AS Utf8) !=
ProjectionExec: expr=[field@1 as field, tag@3 as tag]
DeduplicateExec: [tag@3 ASC,time@2 ASC]
FilterExec: tag@3 > foo AND time@2 > 2
ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]},
projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC,
time@2 ASC, __chunk_order@0 ASC], ...
```
The output of EnforceDistirbution looks like this:
```
2023-12-06T18:40:19.827226Z TRACE datafusion::physical_planner: Optimized
physical plan by EnforceDistribution:
OutputRequirementExec
ProjectionExec: expr=[tag@1 as tag]
FilterExec: CAST(field@0 AS Utf8) !=
RepartitionExec: partitioning=RoundRobinBatch(6), input_partitions=1
ProjectionExec: expr=[field@1 as field, tag@3 as tag]
DeduplicateExec: [tag@3 ASC,time@2 ASC]
SortPreservingMergeExec: [tag@3 ASC,time@2 ASC,__chunk_order@0
ASC] <----- This needs the input to be sorted
FilterExec: tag@3 > foo AND time@2 > 2
ParquetExec: file_groups={6 groups: [[1.parquet:0..1,
2.parquet:0..16666666], [2.parquet:16666666..33333333],
[2.parquet:33333333..50000000], [2.parquet:50000000..66666667],
[2.parquet:66666667..83333334], ...]}, ... <---- this file is no longer sorted
(as it was repartitioned)
```
Specifically, the DataFusion planner parallelized the read of the parquet
files into multiple partitions and in so doing has destroyed the sort order.
(the `16666666..33333333` annotations mean read that byte range in the file)
This is actually reflected correctly by the `ParquetExec` (it no longer says
"output_ordering" because it is no longer sorted) however, the plan now has a
`SortPreservingMerge` added above it, which implies that the output is sorted,
which is incorrect.
Input
```
ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]},
projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC,
time@2 ASC, __chunk_order@0 ASC],....
```
Output:
```
ParquetExec: file_groups={6 groups: [[1.parquet:0..1,
2.parquet:0..16666666], [2.parquet:16666666..33333333],
[2.parquet:33333333..50000000], [2.parquet:50000000..66666667],
[2.parquet:66666667..83333334], ...]}, ...
```
So things that are wrong:
1. The output of the scan is no longer sorted but it is being merged using
`SortPreservingMerge` (which avoids the required resort)
2. It is not right to be repartitioning the sorted input files into multiple
partitions in the first place, as that destroys the sort order. There is a
[config setting that is supposed to control this
`datafusion.optimizer.prefer_existing_sort`](https://arrow.apache.org/datafusion/user-guide/configs.html)
and IOx sets it to true:
I am working on a reproducer in DataFusion
### Expected behavior
The correct answer should be produced.
I think this means that either:
1. the `ParquetExec` should not be repartitioned if it would destroy the
sort order,
2. The `parquet exec` repartition code should be aware of the repartition
and not destroy the sort order
### Additional context
We found that setting [the config setting
`datafusion.optimizer.repartition_file_scans`](https://arrow.apache.org/datafusion/user-guide/configs.html)
and IOx sets to false was a workaround:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]