suremarc commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2722432514
Thanks for the writeup. The core idea makes sense, but I have a couple of
comments on the design, in particular I think we can do better in a few ways.
# 1. Some but not all partitions overlap
As I understand it, the current implementation gives up if any two
partitions are overlapping. This is fine if your target use case involves
queries with no overlapping partitions, but we can do better.
Basically, if we take the proposed algorithm and add another step at the end
where we apply "first fit", instead of producing only one lexical ordering or
failing fast, we can produce multiple groups of "chained" input partitions,
each ordered lexically by the sort key within themselves, and we can optimize
the query to use the minimum number of chains possible.
Let me give an example:
```sql
SELECT * FROM recent_table_1
WHERE time > now() - INTERVAL 1 DAY
UNION ALL
SELECT * FROM recent_table_2
WHERE time > now() - INTERVAL 1 DAY
UNION ALL
SELECT * FROM historic_table
WHERE time < now() - INTERVAL 1 DAY
ORDER BY time ASC
```
Incidentally this is the use case I am targeting. Anyway, this query would
result in at least 3 partitions, two of which are overlapping in `time`. If we
could generalize `ProgressiveEval` to have multiple partition groupings, we
could do something like this:
```sql
SortPreservingMergeExec: time ASC
ProgressiveEval: partitions=[2, 0], [1]
UnionExec: partitions=[0, 1, 2]
TableExec: recent_table_1
TableExec: recent_table_2
TableExec: historic_table
```
This would concatenate partitions 2 and 0, and partition 1 remains
unchanged. Then a final `SortPreservingMergeExec` is required to merge these
into one sorted stream.
The "first fit" algorithm has actually already been implemented in
[`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
which uses the
[`MinMaxStatistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/statistics.rs#L64)
helper to analyze min/maxes, however it was written to be used in
`ParquetExec` (now deprecated and replaced with `DataSourceExec` I believe).
I believe this change could be retrofitted onto `ProgressiveEval` with not
too much additional complexity. The analysis is already implemented, we just
need to use it. Or we can take the implementation from influxdb and just add
the final "first fit" step.
# 2. Scanning non-overlapping Parquet files
I see that one of the target use cases for `ProgressiveEval` is #6672. I am
a little curious to see the implementation, because the way I see it,
`ProgressiveEval` will solve some but not all instances of this problem.
Here is the issue I am worried about. When you use `ListingTable` you have a
config option called `target_partitions`. By default it is set to the number of
available cpu cores on the system. If the number of files exceeds
`target_partitions` then it will start to merge files into the same partitions
with no guarantees on ordering. Let me demonstrate using `datafusion-cli`:
```sql
> SET datafusion.execution.target_partitions=2;
0 row(s) fetched.
Elapsed 0.000 seconds.
> CREATE EXTERNAL TABLE t1 (id INT, date DATE) STORED AS PARQUET LOCATION
'./data/' PARTITIONED BY (date) WITH ORDER (id ASC);
0 row(s) fetched.
Elapsed 0.002 seconds.
> INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2,
'2025-03-03'), (1, '2025-03-04');
+-------+
| count |
+-------+
| 4 |
+-------+
1 row(s) fetched.
Elapsed 0.004 seconds.
> EXPLAIN SELECT * FROM t1 ORDER BY id ASC;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: t1.id ASC NULLS LAST
|
| | TableScan: t1 projection=[id, date]
|
| physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST]
|
| | SortExec: expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[true]
|
| | DataSourceExec: file_groups={2 groups:
[[./data/date=2025-03-01/9nxILoicy2uUAt7r.parquet,
./data/date=2025-03-02/9nxILoicy2uUAt7r.parquet],
[./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet,
./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date],
file_type=parquet |
| |
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.002 seconds.
> EXPLAIN SELECT * FROM t1 WHERE date > '2025-03-02' ORDER BY id ASC;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: t1.id ASC NULLS LAST
|
| | TableScan: t1 projection=[id, date],
full_filters=[t1.date > Date32("2025-03-02")]
|
| physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST]
|
| | DataSourceExec: file_groups={2 groups:
[[./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet],
[./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date],
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
| |
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.001 seconds.
```
When there are more than 2 files in the scan, the partitions get merged in
no particular order. In particular any ordering on `id ASC` is lost, our
partitions end up having id's of `[4, 3]` and `[2, 1]`. If `ProgressiveEval`
were to run on this plan, it would see that the input partitions are not
ordered and give up.
IMO, for this use case, `ProgressiveEval` is trying to solve a problem it
does not have total control over. The files are not visible above the
`DataSourceExec`, and it has no ability to reorder the files, even though we
know we could avoid a sort here. There is also the potential issue of
`RepartitionExec` and other nodes sitting in between `ProgressiveEval` and
`DataSourceExec`.
I don't mean to tout my own horn too much, but in fact this exact use case
is what
[`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
was written to solve. We can solve the problem locally at the
`DataSourceExec`, which I think is the right place to do it. It's still gated
behind a feature flag, I have not been able to dedicate the time to set up
benchmarks for `ListingTable` which I think is required to take this feature
out of being experimental and ship it.
# Conclusion
Basically I think the design as-is is good enough to include in DataFusion,
but I would like to see it generalized a bit, but I also think it may not
completely solve #6672. That said I think it will solve other problems,
including optimizing queries with non-overlapping unions in them.
I apologize if I come off as a bit overbearing 😅 but this issue is near and
dear to my heart. Eliminating sorts has been one of the most important things
in my team's project, and it sounds like InfluxDB has been dealing with the
same issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]