alamb commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730417397

   > Incidentally this is the use case I am targeting. Anyway, this query would 
result in at least 3 partitions, two of which are overlapping in time. If we 
could generalize ProgressiveEval to have multiple partition groupings, we could 
do something like this:
   
   This is a neat idea -- basically a cascade of progressive evals to avoid 
some (but not all) merging
   
   
   > I see that one of the target use cases for ProgressiveEval is 
https://github.com/apache/datafusion/issues/6672. I am a little curious to see 
the implementation, because the way I see it, ProgressiveEval will solve some 
but not all instances of this problem.
   
   Our progressive eval implementation is here 
https://github.com/influxdata/influxdb3_core/blob/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/iox_query/src/provider/progressive_eval.rs
   
   (there is a PR to add it to Datafusion here: 
https://github.com/apache/datafusion/pull/10490)
   
   
   > Here is the issue I am worried about. When you use ListingTable you have a 
config option called target_partitions. By default it is set to the number of 
available cpu cores on the system. If the number of files exceeds 
target_partitions then it will start to merge files into the same partitions 
with no guarantees on ordering. Let me demonstrate using datafusion-cli:
   
   > If the number of files exceeds target_partitions then it will start to 
merge files into the same partitions with no guarantees on ordering. 
   
   I think it uses 
[`FileGroupPartitioner`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroupPartitioner.html)
 that maintains the same ordering... But maybe it is a different ordering than 
you have in mind
   
   You can define an order using `WITH ORDER` like this:
   
   ```sql
   DataFusion CLI v46.0.1
   > copy (values (4, '2025-03-01')) to '/tmp/test/1.parquet';
   +-------+
   | count |
   +-------+
   | 1     |
   +-------+
   1 row(s) fetched.
   Elapsed 0.004 seconds.
   
   > copy (values (3, '2025-03-02')) to '/tmp/test/2.parquet';
   +-------+
   | count |
   +-------+
   | 1     |
   +-------+
   1 row(s) fetched.
   Elapsed 0.004 seconds.
   
   > create external table test stored as parquet location '/tmp/test' with 
order (column2 ASC);
   0 row(s) fetched.
   Elapsed 0.006 seconds.
   ```
   
   Then the sort is not needed:
   ```sql
   > explain select * from test order by column2 asc;
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                           |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: test.column2 ASC NULLS LAST                          
                                                                                
                                                                                
                                                                           |
   |               |   TableScan: test projection=[column1, column2]            
                                                                                
                                                                                
                                                                           |
   | physical_plan | SortPreservingMergeExec: [column2@1 ASC NULLS LAST]        
                                                                                
                                                                                
                                                                           |
   |               |   DataSourceExec: file_groups={16 groups: 
[[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], 
[tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], 
[tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], 
output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                           |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched.
   Elapsed 0.002 seconds.
   ```
   
   Though datafusion will put it when needed
   ```sql
   > explain select * from test order by column2 desc;
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: test.column2 DESC NULLS FIRST                        
                                                                                
                                                                                
                                                                             |
   |               |   TableScan: test projection=[column1, column2]            
                                                                                
                                                                                
                                                                             |
   | physical_plan | SortPreservingMergeExec: [column2@1 DESC]                  
                                                                                
                                                                                
                                                                             |
   |               |   SortExec: expr=[column2@1 DESC], 
preserve_partitioning=[true]                                                    
                                                                                
                                                                                
                     |
   |               |     DataSourceExec: file_groups={16 groups: 
[[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], 
[tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], 
[tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], 
output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                             |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched.
   Elapsed 0.012 seconds.
   ```
   
   > I apologize if I come off as a bit overbearing 😅 but this issue is near 
and dear to my heart. Eliminating sorts has been one of the most important 
things in my team's project, and it sounds like InfluxDB has been dealing with 
the same issue.
   
   
   
   Not at all! Yes we have done a lot of this (as have @berkaysynnada 
@ozankabak and @akurmustafa at Synnada). It is a very important optimization
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to