suremarc commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2722432514
Thanks for the writeup. The core idea makes sense, but I have a couple of comments on the design, in particular I think we can do better in a few ways. # 1. Some but not all partitions overlap As I understand it, the current implementation gives up if any two partitions are overlapping. This is fine if your target use case involves queries with no overlapping partitions, but we can do better. Basically, if we take the proposed algorithm and add another step at the end where we apply "first fit", instead of producing only one lexical ordering or failing fast, we can produce multiple groups of "chained" input partitions, each ordered lexically by the sort key within themselves, and we can optimize the query to use the minimum number of chains possible. Let me give an example: ```sql SELECT * FROM recent_table_1 WHERE time > now() - INTERVAL 1 DAY UNION ALL SELECT * FROM recent_table_2 WHERE time > now() - INTERVAL 1 DAY UNION ALL SELECT * FROM historic_table WHERE time < now() - INTERVAL 1 DAY ORDER BY time ASC ``` Incidentally this is the use case I am targeting. Anyway, this query would result in at least 3 partitions, two of which are overlapping in `time`. If we could generalize `ProgressiveEval` to have multiple partition groupings, we could do something like this: ```sql SortPreservingMergeExec: time ASC ProgressiveEval: partitions=[2, 0], [1] UnionExec: partitions=[0, 1, 2] TableExec: recent_table_1 TableExec: recent_table_2 TableExec: historic_table ``` This would concatenate partitions 2 and 0, and partition 1 remains unchanged. Then a final `SortPreservingMergeExec` is required to merge these into one sorted stream. The "first fit" algorithm has actually already been implemented in [`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569) which uses the [`MinMaxStatistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/statistics.rs#L64) helper to analyze min/maxes, however it was written to be used in `ParquetExec` (now deprecated and replaced with `DataSourceExec` I believe). I believe this change could be retrofitted onto `ProgressiveEval` with not too much additional complexity. The analysis is already implemented, we just need to use it. Or we can take the implementation from influxdb and just add the final "first fit" step. # 2. Scanning non-overlapping Parquet files I see that one of the target use cases for `ProgressiveEval` is #6672. I am a little curious to see the implementation, because the way I see it, `ProgressiveEval` will solve some but not all instances of this problem. Here is the issue I am worried about. When you use `ListingTable` you have a config option called `target_partitions`. By default it is set to the number of available cpu cores on the system. If the number of files exceeds `target_partitions` then it will start to merge files into the same partitions with no guarantees on ordering. Let me demonstrate using `datafusion-cli`: ```sql > SET datafusion.execution.target_partitions=2; 0 row(s) fetched. Elapsed 0.000 seconds. > CREATE EXTERNAL TABLE t1 (id INT, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC); 0 row(s) fetched. Elapsed 0.002 seconds. > INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04'); +-------+ | count | +-------+ | 4 | +-------+ 1 row(s) fetched. Elapsed 0.004 seconds. > EXPLAIN SELECT * FROM t1 ORDER BY id ASC; +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Sort: t1.id ASC NULLS LAST | | | TableScan: t1 projection=[id, date] | | physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] | | | DataSourceExec: file_groups={2 groups: [[./data/date=2025-03-01/9nxILoicy2uUAt7r.parquet, ./data/date=2025-03-02/9nxILoicy2uUAt7r.parquet], [./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet, ./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date], file_type=parquet | | | | +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 2 row(s) fetched. Elapsed 0.002 seconds. > EXPLAIN SELECT * FROM t1 WHERE date > '2025-03-02' ORDER BY id ASC; +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Sort: t1.id ASC NULLS LAST | | | TableScan: t1 projection=[id, date], full_filters=[t1.date > Date32("2025-03-02")] | | physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST] | | | DataSourceExec: file_groups={2 groups: [[./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet], [./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet | | | | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 2 row(s) fetched. Elapsed 0.001 seconds. ``` When there are more than 2 files in the scan, the partitions get merged in no particular order. In particular any ordering on `id ASC` is lost, our partitions end up having id's of `[4, 3]` and `[2, 1]`. If `ProgressiveEval` were to run on this plan, it would see that the input partitions are not ordered and give up. IMO, for this use case, `ProgressiveEval` is trying to solve a problem it does not have total control over. The files are not visible above the `DataSourceExec`, and it has no ability to reorder the files, even though we know we could avoid a sort here. There is also the potential issue of `RepartitionExec` and other nodes sitting in between `ProgressiveEval` and `DataSourceExec`. I don't mean to tout my own horn too much, but in fact this exact use case is what [`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569) was written to solve. We can solve the problem locally at the `DataSourceExec`, which I think is the right place to do it. It's still gated behind a feature flag, I have not been able to dedicate the time to set up benchmarks for `ListingTable` which I think is required to take this feature out of being experimental and ship it. # Conclusion Basically I think the design as-is is good enough to include in DataFusion, but I would like to see it generalized a bit, but I also think it may not completely solve #6672. That said I think it will solve other problems, including optimizing queries with non-overlapping unions in them. I apologize if I come off as a bit overbearing 😅 but this issue is near and dear to my heart. Eliminating sorts has been one of the most important things in my team's project, and it sounds like InfluxDB has been dealing with the same issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org