suremarc commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730732469

   > I think it uses 
[FileGroupPartitioner](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroupPartitioner.html)
 that maintains the same ordering... But maybe it is a different ordering than 
you have in mind
   
   AFAICT `FileGroupPartitioner` is able to maintain the same ordering as what 
it is given, yes. But the initial ordering for the partitions as determined by 
`ListingTable` is ultimately just sorting by the object store paths (see 
[here](https://docs.rs/datafusion-catalog-listing/46.0.0/src/datafusion_catalog_listing/helpers.rs.html#133-136)),
 which may not match the table order. This is the case in my (somewhat 
contrived) example above, where the table order is `id ASC` but `id` decreases 
as `date` increases. 
   
   For a more realistic example where the table order and object store path 
order don't match, consider a horizontally partitioned table, something like 
this:
   
   ```sql
   DataFusion CLI v46.0.0
   > SET datafusion.execution.target_partitions=2;
   0 row(s) fetched. 
   Elapsed 0.001 seconds.
   
   > CREATE EXTERNAL TABLE t1 (time TIMESTAMP, date DATE, shard INT) STORED AS 
PARQUET LOCATION '/tmp/data/' PARTITIONED BY (date, shard) WITH ORDER (time 
ASC);
   0 row(s) fetched. 
   Elapsed 0.003 seconds.
   
   > INSERT INTO t1 VALUES 
   ('2025-03-01 00:00:01', '2025-03-01', 0), 
   ('2025-03-01 00:00:00', '2025-03-01', 1), 
   ('2025-03-02 00:00:00', '2025-03-02', 0), 
   ('2025-03-02 00:00:02', '2025-03-02', 1);
   +-------+
   | count |
   +-------+
   | 4     |
   +-------+
   1 row(s) fetched. 
   Elapsed 0.011 seconds.
   
   > EXPLAIN SELECT * FROM t1 ORDER BY time ASC;
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                     |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: t1.time ASC NULLS LAST                               
                                                                                
                                                                                
                                                                                
                                     |
   |               |   TableScan: t1 projection=[time, date, shard]             
                                                                                
                                                                                
                                                                                
                                     |
   | physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST]           
                                                                                
                                                                                
                                                                                
                                     |
   |               |   SortExec: expr=[time@0 ASC NULLS LAST], 
preserve_partitioning=[true]                                                    
                                                                                
                                                                                
                                                      |
   |               |     DataSourceExec: file_groups={2 groups: 
[[tmp/data/date=2025-03-01/shard=0/8eTZY2WyyhnV7Klv.parquet, 
tmp/data/date=2025-03-01/shard=1/8eTZY2WyyhnV7Klv.parquet], 
[tmp/data/date=2025-03-02/shard=0/8eTZY2WyyhnV7Klv.parquet, 
tmp/data/date=2025-03-02/shard=1/8eTZY2WyyhnV7Klv.parquet]]}, projection=[time, 
date, shard], file_type=parquet |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                     |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched. 
   Elapsed 0.001 seconds.
   ```
   
   Sorting by the object store paths gives us a lexsort by `date` and then 
`shard`, which leads to a loss of ordering within partitions. `ProgressiveEval` 
will not be able to fix this alone unless it is able to manipulate 
`DataSourceExec`'s files. 
   
   > Then the sort is not needed:
   > 
   > ```sql
   > explain select * from test order by column2 asc;
   > 
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   > | plan_type     | plan                                                     
                                                                                
                                                                                
                                                                             |
   > 
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   > | logical_plan  | Sort: test.column2 ASC NULLS LAST                        
                                                                                
                                                                                
                                                                             |
   > |               |   TableScan: test projection=[column1, column2]          
                                                                                
                                                                                
                                                                             |
   > | physical_plan | SortPreservingMergeExec: [column2@1 ASC NULLS LAST]      
                                                                                
                                                                                
                                                                             |
   > |               |   DataSourceExec: file_groups={16 groups: 
[[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], 
[tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], 
[tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], 
output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
   > |               |                                                          
                                                                                
                                                                                
                                                                             |
   > 
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   > 2 row(s) fetched.
   > Elapsed 0.002 seconds.
   > ```
   
   The only reason it is not needed here is because there are fewer files than 
`target_partitions`, so this will not work if we increase the number of files 
or reduce `target_partitions`. If we set `target_partitions` to 1 then it 
requires a sort:
   
   ```sql
   > SET datafusion.execution.target_partitions=1;
   0 row(s) fetched. 
   Elapsed 0.000 seconds.
   
   > EXPLAIN SELECT * FROM test ORDER BY column2 ASC;
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                           |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: test.column2 ASC NULLS LAST                          
                                                                           |
   |               |   TableScan: test projection=[column1, column2]            
                                                                           |
   | physical_plan | SortExec: expr=[column2@1 ASC NULLS LAST], 
preserve_partitioning=[false]                                                   
           |
   |               |   DataSourceExec: file_groups={1 group: 
[[tmp/test/1.parquet, tmp/test/2.parquet]]}, projection=[column1, column2], 
file_type=parquet |
   |               |                                                            
                                                                           |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   `target_partitions` is usually the number of cpu cores, so typically 16 or 
so, but the problem will still appear for tables with more than 16 files (which 
is most tables that I have worked with). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to