AdamGS opened a new issue, #18513:
URL: https://github.com/apache/datafusion/issues/18513

   ### Describe the bug
   
   When pushing down a filter into a `FileSource` (more specifically - when 
updating the node in `FileSource::try_pushdown_filters`, even if there's no 
filter), it seems like minor changes can result in very different behaviors 
when interacting with output ordering, sometimes repartitioning even very small 
files into many partitions.
   
   |           | Ordering                                          | No 
Ordering                                                                    |
   
|-----------|---------------------------------------------------|--------------------------------------------------------------------------------|
   | Filter    | DataScanExec is partitiond to `target_partitions` | 
`DataSourceExec`  isn't repartitioned, but gets a  `RepartitionedExec`  on top |
   | No Filter | Not partitioned, one partition per file           | No 
partitioning at all, just `DataSourceExec`                                  |
   
   I've run into this bug while working on the Vortex `FileSource`, which had a 
code path where it will return an updated node with no actual change (for 
example - when the filters array is empty), which will result in an actual 
change in the plan (For Parquet - that code path doesn't exist but as far as I 
can tell that's the main difference)
   I'm not 100% sure this is a bug, but it seems like repartitioning small 
files is potentially pretty wasteful, this also seems somewhat related to 
#4967, which is why I'm filing it.
   
   ### To Reproduce
   
   ## Ordering and filter
   ```sql
   -- create table
   CREATE EXTERNAL TABLE my_tbl 
   (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
   STORED AS PARQUET  \
   WITH ORDER (c1 ASC) \
   LOCATION './my_tbl/';
   
   -- insert some data
   INSERT INTO my_tbl VALUES \
   ('air', 10), ('alabama', 20), ('balloon', 30),\
    ('kangaroo', 11), ('zebra', 21);
   
   --- explain query
   EXPLAIN SELECT * FROM my_tbl  WHERE c2 > 10;
   
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                     |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | TableScan: my_tbl projection=[c1, c2]                      
                                                                                
                                                     |
   | physical_plan | DataSourceExec: file_groups={1 group: 
[[Users/adamgs/code/repartition-repro/my_tbl/VSb9EmxEdPYbb6EJ_0.parquet]]}, 
projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], file_type=parquet |
   |               |                                                            
                                                                                
                                                     |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   ```
   ## Ordering, no filter
   ```sql
   -- create table
   CREATE EXTERNAL TABLE my_tbl 
   (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
   WITH ORDER (c1 ASC) \
   STORED AS PARQUET  \
   LOCATION './my_tbl/';
   
   -- insert some data
   INSERT INTO my_tbl VALUES \
   ('air', 10), ('alabama', 20), ('balloon', 30),\
    ('kangaroo', 11), ('zebra', 21);
   
   --- explain query
   EXPLAIN SELECT * FROM my_tbl;
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                     |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | TableScan: my_tbl projection=[c1, c2]                      
                                                                                
                                                     |
   | physical_plan | DataSourceExec: file_groups={1 group: 
[[Users/adamgs/code/repartition-repro/my_tbl/9rqWvTvx7fWJYwEY_0.parquet]]}, 
projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], file_type=parquet |
   |               |                                                            
                                                                                
                                                     |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   ```
   
   
   ## No ordering, with filter
   ```sql
   -- create table
   CREATE EXTERNAL TABLE my_tbl 
   (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
   STORED AS PARQUET  \
   LOCATION './my_tbl/';
   
   -- insert some data
   INSERT INTO my_tbl VALUES \
   ('air', 10), ('alabama', 20), ('balloon', 30),\
    ('kangaroo', 11), ('zebra', 21);
   
   --- explain query
   EXPLAIN SELECT * FROM my_tbl  WHERE c2 > 10;
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                     |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Filter: my_tbl.c2 > Int32(10)                              
                                                                                
                                                                                
                                                     |
   |               |   TableScan: my_tbl projection=[c1, c2], 
partial_filters=[my_tbl.c2 > Int32(10)]                                         
                                                                                
                                                                       |
   | physical_plan | CoalesceBatchesExec: target_batch_size=8192                
                                                                                
                                                                                
                                                     |
   |               |   FilterExec: c2@1 > 10                                    
                                                                                
                                                                                
                                                     |
   |               |     RepartitionExec: partitioning=RoundRobinBatch(14), 
input_partitions=1                                                              
                                                                                
                                                         |
   |               |       DataSourceExec: file_groups={1 group: 
[[Users/adamgs/code/repartition-repro/my_tbl/uhkDHvDvl1Q3yo5v_0.parquet]]}, 
projection=[c1, c2], file_type=parquet, predicate=c2@1 > 10, 
pruning_predicate=c2_null_count@1 != row_count@2 AND c2_max@0 > 10, 
required_guarantees=[] |
   |               |                                                            
                                                                                
                                                                                
                                                     |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
   ```
   
   ## No ordering, no filter
   ```sql
   -- create table
   CREATE EXTERNAL TABLE my_tbl 
   (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
   STORED AS PARQUET  \
   LOCATION './my_tbl/';
   
   -- insert some data
   INSERT INTO my_tbl VALUES \
   ('air', 10), ('alabama', 20), ('balloon', 30),\
    ('kangaroo', 11), ('zebra', 21);
   
   --- explain query
   EXPLAIN SELECT * FROM my_tbl;
   
   
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
              |
   
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | TableScan: my_tbl projection=[c1, c2]                      
                                                                                
              |
   | physical_plan | DataSourceExec: file_groups={1 group: 
[[Users/adamgs/code/repartition-repro/my_tbl/sQrcFZmANMuthBwu_0.parquet]]}, 
projection=[c1, c2], file_type=parquet |
   |               |                                                            
                                                                                
              |
   
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to