suremarc commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730732469
> I think it uses [FileGroupPartitioner](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroupPartitioner.html) that maintains the same ordering... But maybe it is a different ordering than you have in mind AFAICT `FileGroupPartitioner` is able to maintain the same ordering as what it is given, yes. But the initial ordering for the partitions as determined by `ListingTable` is ultimately just sorting by the object store paths (see [here](https://docs.rs/datafusion-catalog-listing/46.0.0/src/datafusion_catalog_listing/helpers.rs.html#133-136)), which may not match the table order. This is the case in my (somewhat contrived) example above, where the table order is `id ASC` but `id` decreases as `date` increases. For a more realistic example where the table order and object store path order don't match, consider a horizontally partitioned table, something like this: ```sql DataFusion CLI v46.0.0 > SET datafusion.execution.target_partitions=2; 0 row(s) fetched. Elapsed 0.001 seconds. > CREATE EXTERNAL TABLE t1 (time TIMESTAMP, date DATE, shard INT) STORED AS PARQUET LOCATION '/tmp/data/' PARTITIONED BY (date, shard) WITH ORDER (time ASC); 0 row(s) fetched. Elapsed 0.003 seconds. > INSERT INTO t1 VALUES ('2025-03-01 00:00:01', '2025-03-01', 0), ('2025-03-01 00:00:00', '2025-03-01', 1), ('2025-03-02 00:00:00', '2025-03-02', 0), ('2025-03-02 00:00:02', '2025-03-02', 1); +-------+ | count | +-------+ | 4 | +-------+ 1 row(s) fetched. Elapsed 0.011 seconds. > EXPLAIN SELECT * FROM t1 ORDER BY time ASC; +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Sort: t1.time ASC NULLS LAST | | | TableScan: t1 projection=[time, date, shard] | | physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST] | | | SortExec: expr=[time@0 ASC NULLS LAST], preserve_partitioning=[true] | | | DataSourceExec: file_groups={2 groups: [[tmp/data/date=2025-03-01/shard=0/8eTZY2WyyhnV7Klv.parquet, tmp/data/date=2025-03-01/shard=1/8eTZY2WyyhnV7Klv.parquet], [tmp/data/date=2025-03-02/shard=0/8eTZY2WyyhnV7Klv.parquet, tmp/data/date=2025-03-02/shard=1/8eTZY2WyyhnV7Klv.parquet]]}, projection=[time, date, shard], file_type=parquet | | | | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 2 row(s) fetched. Elapsed 0.001 seconds. ``` Sorting by the object store paths gives us a lexsort by `date` and then `shard`, which leads to a loss of ordering within partitions. `ProgressiveEval` will not be able to fix this alone unless it is able to manipulate `DataSourceExec`'s files. > Then the sort is not needed: > > ```sql > explain select * from test order by column2 asc; > +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ > | plan_type | plan | > +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ > | logical_plan | Sort: test.column2 ASC NULLS LAST | > | | TableScan: test projection=[column1, column2] | > | physical_plan | SortPreservingMergeExec: [column2@1 ASC NULLS LAST] | > | | DataSourceExec: file_groups={16 groups: [[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], [tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], [tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet | > | | | > +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ > 2 row(s) fetched. > Elapsed 0.002 seconds. > ``` The only reason it is not needed here is because there are fewer files than `target_partitions`, so this will not work if we increase the number of files or reduce `target_partitions`. If we set `target_partitions` to 1 then it requires a sort: ```sql > SET datafusion.execution.target_partitions=1; 0 row(s) fetched. Elapsed 0.000 seconds. > EXPLAIN SELECT * FROM test ORDER BY column2 ASC; +---------------+---------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Sort: test.column2 ASC NULLS LAST | | | TableScan: test projection=[column1, column2] | | physical_plan | SortExec: expr=[column2@1 ASC NULLS LAST], preserve_partitioning=[false] | | | DataSourceExec: file_groups={1 group: [[tmp/test/1.parquet, tmp/test/2.parquet]]}, projection=[column1, column2], file_type=parquet | | | | +---------------+---------------------------------------------------------------------------------------------------------------------------------------+ ``` `target_partitions` is usually the number of cpu cores, so typically 16 or so, but the problem will still appear for tables with more than 16 files (which is most tables that I have worked with). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org