suremarc commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2722432514

   Thanks for the writeup. The core idea makes sense, but I have a couple of 
comments on the design, in particular I think we can do better in a few ways. 
   
   # 1. Some but not all partitions overlap
   
   As I understand it, the current implementation gives up if any two 
partitions are overlapping. This is fine if your target use case involves 
queries with no overlapping partitions, but we can do better. 
   Basically, if we take the proposed algorithm and add another step at the end 
where we apply "first fit", instead of producing only one lexical ordering or 
failing fast, we can produce multiple groups of "chained" input partitions, 
each ordered lexically by the sort key within themselves, and we can optimize 
the query to use the minimum number of chains possible. 
   
   Let me give an example:
   ```sql
   SELECT * FROM recent_table_1
   WHERE time > now() - INTERVAL 1 DAY
   UNION ALL
   SELECT * FROM recent_table_2
   WHERE time > now() - INTERVAL 1 DAY
   UNION ALL
   SELECT * FROM historic_table
   WHERE time < now() - INTERVAL 1 DAY
   ORDER BY time ASC
   ```
   
   Incidentally this is the use case I am targeting. Anyway, this query would 
result in at least 3 partitions, two of which are overlapping in `time`. If we 
could generalize `ProgressiveEval` to have multiple partition groupings, we 
could do something like this:
   ```sql
   SortPreservingMergeExec: time ASC
     ProgressiveEval: partitions=[2, 0], [1]
       UnionExec: partitions=[0, 1, 2]
         TableExec: recent_table_1
         TableExec: recent_table_2
         TableExec: historic_table
   ```
   
   This would concatenate partitions 2 and 0, and partition 1 remains 
unchanged. Then a final `SortPreservingMergeExec` is required to merge these 
into one sorted stream. 
   
   The "first fit" algorithm has actually already been implemented in 
[`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
 which uses the 
[`MinMaxStatistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/statistics.rs#L64)
 helper to analyze min/maxes, however it was written to be used in 
`ParquetExec` (now deprecated and replaced with `DataSourceExec` I believe). 
   
   I believe this change could be retrofitted onto `ProgressiveEval` with not 
too much additional complexity. The analysis is already implemented, we just 
need to use it. Or we can take the implementation from influxdb and just add 
the final "first fit" step. 
   
   # 2. Scanning non-overlapping Parquet files
   
   I see that one of the target use cases for `ProgressiveEval` is #6672. I am 
a little curious to see the implementation, because the way I see it, 
`ProgressiveEval` will solve some but not all instances of this problem.
   
   Here is the issue I am worried about. When you use `ListingTable` you have a 
config option called `target_partitions`. By default it is set to the number of 
available cpu cores on the system. If the number of files exceeds 
`target_partitions` then it will start to merge files into the same partitions 
with no guarantees on ordering. Let me demonstrate using `datafusion-cli`:
   
   ```sql
   > SET datafusion.execution.target_partitions=2;
   0 row(s) fetched. 
   Elapsed 0.000 seconds.
   
   > CREATE EXTERNAL TABLE t1 (id INT, date DATE) STORED AS PARQUET LOCATION 
'./data/' PARTITIONED BY (date) WITH ORDER (id ASC);
   0 row(s) fetched. 
   Elapsed 0.002 seconds.
   
   > INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, 
'2025-03-03'), (1, '2025-03-04');
   +-------+
   | count |
   +-------+
   | 4     |
   +-------+
   1 row(s) fetched. 
   Elapsed 0.004 seconds.
   
   > EXPLAIN SELECT * FROM t1 ORDER BY id ASC;
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                    |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: t1.id ASC NULLS LAST                                 
                                                                                
                                                                                
                                                                    |
   |               |   TableScan: t1 projection=[id, date]                      
                                                                                
                                                                                
                                                                    |
   | physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST]             
                                                                                
                                                                                
                                                                    |
   |               |   SortExec: expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[true]                                                    
                                                                                
                                                                                
       |
   |               |     DataSourceExec: file_groups={2 groups: 
[[./data/date=2025-03-01/9nxILoicy2uUAt7r.parquet, 
./data/date=2025-03-02/9nxILoicy2uUAt7r.parquet], 
[./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet, 
./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date], 
file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                    |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched. 
   Elapsed 0.002 seconds.
   
   > EXPLAIN SELECT * FROM t1 WHERE date > '2025-03-02' ORDER BY id ASC;
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
       |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: t1.id ASC NULLS LAST                                 
                                                                                
                                                                                
       |
   |               |   TableScan: t1 projection=[id, date], 
full_filters=[t1.date > Date32("2025-03-02")]                                   
                                                                                
                           |
   | physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST]             
                                                                                
                                                                                
       |
   |               |   DataSourceExec: file_groups={2 groups: 
[[./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet], 
[./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date], 
output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
       |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched. 
   Elapsed 0.001 seconds.
   ```
   
   When there are more than 2 files in the scan, the partitions get merged in 
no particular order. In particular any ordering on `id ASC` is lost, our 
partitions end up having id's of `[4, 3]` and `[2, 1]`. If `ProgressiveEval` 
were to run on this plan, it would see that the input partitions are not 
ordered and give up. 
   
   IMO, for this use case, `ProgressiveEval` is trying to solve a problem it 
does not have total control over. The files are not visible above the 
`DataSourceExec`, and it has no ability to reorder the files, even though we 
know we could avoid a sort here. There is also the potential issue of 
`RepartitionExec` and other nodes sitting in between `ProgressiveEval` and 
`DataSourceExec`. 
   
   I don't mean to tout my own horn too much, but in fact this exact use case 
is what 
[`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
 was written to solve. We can solve the problem locally at the 
`DataSourceExec`, which I think is the right place to do it. It's still gated 
behind a feature flag, I have not been able to dedicate the time to set up 
benchmarks for `ListingTable` which I think is required to take this feature 
out of being experimental and ship it. 
   
   # Conclusion
   
   Basically I think the design as-is is good enough to include in DataFusion, 
but I would like to see it generalized a bit, but I also think it may not 
completely solve #6672. That said I think it will solve other problems, 
including optimizing queries with non-overlapping unions in them. 
   
   I apologize if I come off as a bit overbearing 😅 but this issue is near and 
dear to my heart. Eliminating sorts has been one of the most important things 
in my team's project, and it sounds like InfluxDB has been dealing with the 
same issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to