suremarc opened a new issue, #7490:
URL: https://github.com/apache/arrow-datafusion/issues/7490

   ### Is your feature request related to a problem or challenge?
   
   Related issue: #6672
   
   DataFusion currently cannot avoid sorts when there are more files than there 
are `target_partitions`. 
   
   When querying data from an object store, DataFusion will attempt to group 
files into file groups internally, which are executed concurrently with one 
another. However, if any file group contains multiple files, the 
`file_sort_order` of the files cannot be maintained at present. 
   
   To illustrate, suppose we had a table `trades` comprised of the following 
files in S3:
   
   ```sh
   ❯ mc ls s3/trades/year=2023/month=01 -r
   [2023-05-23 15:13:18 CDT] 1.2GiB STANDARD day=03/trades-2023-01-03.parquet
   [2023-05-23 15:12:36 CDT] 1.2GiB STANDARD day=04/trades-2023-01-04.parquet
   [2023-05-23 15:12:00 CDT] 1.1GiB STANDARD day=05/trades-2023-01-05.parquet
   [2023-05-23 15:13:21 CDT] 1.2GiB STANDARD day=06/trades-2023-01-06.parquet
   [2023-05-23 15:12:40 CDT] 1.2GiB STANDARD day=09/trades-2023-01-09.parquet
   ```
   
   and their sort order was `timestamp ASC`. If our `target_partitions` is set 
to `3`, the following query:
   ```sql
   SELECT * 
   FROM trades 
   ORDER BY timestamp ASC
   LIMIT 50000;
   ```
   
   results in the following suboptimal plan:
   
   ```
   GlobalLimitExec: skip=0, fetch=50000
     SortPreservingMergeExec: [timestamp@0 ASC NULLS LAST], fetch=50000
       SortExec: fetch=50000, expr=[timestamp@0 ASC NULLS LAST]
         ParquetExec: file_groups={3 groups: 
[[year=2023/month=01/day=05/trades-2023-01-05.parquet, 
year=2023/month=01/day=06/trades-2023-01-06.parquet], 
[year=2023/month=01/day=03/trades-2023-01-03.parquet, 
year=2023/month=01/day=09/trades-2023-01-09.parquet], 
[year=2023/month=01/day=04/trades-2023-01-04.parquet]]}, projection=[ticker, 
timestamp, participant_timestamp, trf_timestamp, sequence_number, conditions, 
id, price, size, correction, exchange, trf, tape, year, month, day]
   ```
   
   DataFusion has decided that this plan needs a sort, because some file groups 
have multiple files. (This could be avoided if there was only 1 or even up to 3 
files.) However, in this case we know that `trades-2023-01-03.parquet` could be 
streamed before `trades-2023-01-04.parquet` in timestamp order, because every 
timestamp in `trades-2023-01-03.parquet` precedes every timestamp in 
`trades-2023-01-04.parquet`. In fact, the physical plan shown above *is* 
ordered -- but DataFusion does not know this and currently has no way *to* know 
this. 
   
   ### Describe the solution you'd like
   
   Essentially, DataFusion should be able to detect which files are 
non-overlapping, and use this to intelligently distribute files into file 
groups in such a way that still outputs data in order. Below I offer one 
possible path to doing so, which I believe should be minimally invasive. 
   
   At a minimum, 
[`PartitionedFile`](https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html)
 should have an additional *optional* field, `statistics`, which contains a 
[`Statistics`](https://docs.rs/datafusion/latest/datafusion/common/struct.Statistics.html)
 object with the min/max statistics for that file. 
[`FileScanConfig::project`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html#method.project)
 should be changed to detect when files within a file group are distributed in 
order. Lastly, a physical optimizer to redistribute file groups to be ordered 
may be necessary to take advantage of this in some cases. 
   
   This does not solve the issue of how to feed file-level statistics into 
DataFusion, but users may add extensions to DataFusion that do so -- for 
example a custom 
[`TableProvider`](https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html)
 could do this. However, it should be feasible to integrate this feature into 
[`ListingTable`](https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html).
 In fact the `ListingTable` already fetches file-level statistics on each 
query, but discards them after rolling them up into one statistic per column. 
   
   ### Describe alternatives you've considered
   
   At my company, we created a custom `FileFormat` implementation that outputs 
a wrapped `ParquetExec` with the `output_ordering()` method overrided, and the 
files redistributed to be in-order. However, it relies on hints from 
configuration provided by the user, plus this does not particularly seem in the 
spirit of what `FileFormat` is supposed to be. We would like to implement this 
optimization in a way that fits better with DataFusion and works out of the box 
without hints. 
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to