alamb opened a new issue, #8451:
URL: https://github.com/apache/arrow-datafusion/issues/8451

   ### Describe the bug
   
   We have a case where the `EnforceDistribution` rule has repatitioned a 
ParquetExec which parallelized the read (which is good) but that 
parallelization resulted in destroying the sort order (as it mixes parts of 
different files together in the same partition). The rest of the plan relies on 
the output being sorted, and thus since it is no longer sorted we see incorrect 
results
   
   ### To Reproduce
   
   The  input plan looks like this:
   
   ```
   OutputRequirementExec
     ProjectionExec: expr=[tag@1 as tag]
       FilterExec: CAST(field@0 AS Utf8) !=
         ProjectionExec: expr=[field@1 as field, tag@3 as tag]
           DeduplicateExec: [tag@3 ASC,time@2 ASC]
             FilterExec: tag@3 > foo AND time@2 > 2
               ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, 
projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, 
time@2 ASC, __chunk_order@0 ASC], ...
   ```
   
   The output of EnforceDistirbution looks like this:
   ```
   2023-12-06T18:40:19.827226Z TRACE datafusion::physical_planner: Optimized 
physical plan by EnforceDistribution:
   OutputRequirementExec
     ProjectionExec: expr=[tag@1 as tag]
       FilterExec: CAST(field@0 AS Utf8) !=
         RepartitionExec: partitioning=RoundRobinBatch(6), input_partitions=1
           ProjectionExec: expr=[field@1 as field, tag@3 as tag]
             DeduplicateExec: [tag@3 ASC,time@2 ASC]
               SortPreservingMergeExec: [tag@3 ASC,time@2 ASC,__chunk_order@0 
ASC] <----- This needs the input to be sorted
                 FilterExec: tag@3 > foo AND time@2 > 2
                   ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 
2.parquet:0..16666666], [2.parquet:16666666..33333333], 
[2.parquet:33333333..50000000], [2.parquet:50000000..66666667], 
[2.parquet:66666667..83333334], ...]}, ... <---- this file is no longer sorted 
(as it was repartitioned)
   ```
   
   
   Specifically, the DataFusion planner parallelized the read of the parquet 
files into multiple partitions and in so doing has destroyed the sort order.
   
   (the `16666666..33333333` annotations mean read that byte range in the file) 
   
   This is actually reflected correctly by the `ParquetExec` (it no longer says 
"output_ordering" because it is no longer sorted) however, the plan now has a 
`SortPreservingMerge` added above it, which implies that the output is sorted, 
which is incorrect.
   
   Input
   ```
   ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, 
projection=[__chunk_order, field, time, tag], output_ordering=[tag@3 ASC, 
time@2 ASC, __chunk_order@0 ASC],....
   ```
   
   Output:
   ```
   ParquetExec: file_groups={6 groups: [[1.parquet:0..1, 
2.parquet:0..16666666], [2.parquet:16666666..33333333], 
[2.parquet:33333333..50000000], [2.parquet:50000000..66666667], 
[2.parquet:66666667..83333334], ...]}, ...
   ```
   
   So things that are wrong:
   1. The output of the scan is no longer sorted but it is being merged using 
`SortPreservingMerge` (which avoids the required resort)
   2. It is not right to be repartitioning the sorted input files into multiple 
partitions in the first place, as that destroys the sort order. There is a 
[config setting that is supposed to control this 
`datafusion.optimizer.prefer_existing_sort`](https://arrow.apache.org/datafusion/user-guide/configs.html)
 and IOx sets it to true:
   
   I am working on a reproducer in DataFusion
   
   ### Expected behavior
   
   The correct answer should be produced.
   
   I think this means that either:
   1.  the `ParquetExec` should not be repartitioned if it would destroy the 
sort order, 
   2. The `parquet exec` repartition code should be aware of the repartition 
and not destroy the sort order
   
   ### Additional context
   
   We found that setting [the config setting 
`datafusion.optimizer.repartition_file_scans`](https://arrow.apache.org/datafusion/user-guide/configs.html)
 and IOx sets to false was a workaround:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to