suremarc opened a new issue, #7490:
URL: https://github.com/apache/arrow-datafusion/issues/7490
### Is your feature request related to a problem or challenge?
Related issue: #6672
DataFusion currently cannot avoid sorts when there are more files than there
are `target_partitions`.
When querying data from an object store, DataFusion will attempt to group
files into file groups internally, which are executed concurrently with one
another. However, if any file group contains multiple files, the
`file_sort_order` of the files cannot be maintained at present.
To illustrate, suppose we had a table `trades` comprised of the following
files in S3:
```sh
❯ mc ls s3/trades/year=2023/month=01 -r
[2023-05-23 15:13:18 CDT] 1.2GiB STANDARD day=03/trades-2023-01-03.parquet
[2023-05-23 15:12:36 CDT] 1.2GiB STANDARD day=04/trades-2023-01-04.parquet
[2023-05-23 15:12:00 CDT] 1.1GiB STANDARD day=05/trades-2023-01-05.parquet
[2023-05-23 15:13:21 CDT] 1.2GiB STANDARD day=06/trades-2023-01-06.parquet
[2023-05-23 15:12:40 CDT] 1.2GiB STANDARD day=09/trades-2023-01-09.parquet
```
and their sort order was `timestamp ASC`. If our `target_partitions` is set
to `3`, the following query:
```sql
SELECT *
FROM trades
ORDER BY timestamp ASC
LIMIT 50000;
```
results in the following suboptimal plan:
```
GlobalLimitExec: skip=0, fetch=50000
SortPreservingMergeExec: [timestamp@0 ASC NULLS LAST], fetch=50000
SortExec: fetch=50000, expr=[timestamp@0 ASC NULLS LAST]
ParquetExec: file_groups={3 groups:
[[year=2023/month=01/day=05/trades-2023-01-05.parquet,
year=2023/month=01/day=06/trades-2023-01-06.parquet],
[year=2023/month=01/day=03/trades-2023-01-03.parquet,
year=2023/month=01/day=09/trades-2023-01-09.parquet],
[year=2023/month=01/day=04/trades-2023-01-04.parquet]]}, projection=[ticker,
timestamp, participant_timestamp, trf_timestamp, sequence_number, conditions,
id, price, size, correction, exchange, trf, tape, year, month, day]
```
DataFusion has decided that this plan needs a sort, because some file groups
have multiple files. (This could be avoided if there was only 1 or even up to 3
files.) However, in this case we know that `trades-2023-01-03.parquet` could be
streamed before `trades-2023-01-04.parquet` in timestamp order, because every
timestamp in `trades-2023-01-03.parquet` precedes every timestamp in
`trades-2023-01-04.parquet`. In fact, the physical plan shown above *is*
ordered -- but DataFusion does not know this and currently has no way *to* know
this.
### Describe the solution you'd like
Essentially, DataFusion should be able to detect which files are
non-overlapping, and use this to intelligently distribute files into file
groups in such a way that still outputs data in order. Below I offer one
possible path to doing so, which I believe should be minimally invasive.
At a minimum,
[`PartitionedFile`](https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html)
should have an additional *optional* field, `statistics`, which contains a
[`Statistics`](https://docs.rs/datafusion/latest/datafusion/common/struct.Statistics.html)
object with the min/max statistics for that file.
[`FileScanConfig::project`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html#method.project)
should be changed to detect when files within a file group are distributed in
order. Lastly, a physical optimizer to redistribute file groups to be ordered
may be necessary to take advantage of this in some cases.
This does not solve the issue of how to feed file-level statistics into
DataFusion, but users may add extensions to DataFusion that do so -- for
example a custom
[`TableProvider`](https://docs.rs/datafusion/latest/datafusion/datasource/provider/trait.TableProvider.html)
could do this. However, it should be feasible to integrate this feature into
[`ListingTable`](https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html).
In fact the `ListingTable` already fetches file-level statistics on each
query, but discards them after rolling them up into one statistic per column.
### Describe alternatives you've considered
At my company, we created a custom `FileFormat` implementation that outputs
a wrapped `ParquetExec` with the `output_ordering()` method overrided, and the
files redistributed to be in-order. However, it relies on hints from
configuration provided by the user, plus this does not particularly seem in the
spirit of what `FileFormat` is supposed to be. We would like to implement this
optimization in a way that fits better with DataFusion and works out of the box
without hints.
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]