AdamGS opened a new issue, #18513:
URL: https://github.com/apache/datafusion/issues/18513
### Describe the bug
When pushing down a filter into a `FileSource` (more specifically - when
updating the node in `FileSource::try_pushdown_filters`, even if there's no
filter), it seems like minor changes can result in very different behaviors
when interacting with output ordering, sometimes repartitioning even very small
files into many partitions.
| | Ordering | No
Ordering |
|-----------|---------------------------------------------------|--------------------------------------------------------------------------------|
| Filter | DataScanExec is partitiond to `target_partitions` |
`DataSourceExec` isn't repartitioned, but gets a `RepartitionedExec` on top |
| No Filter | Not partitioned, one partition per file | No
partitioning at all, just `DataSourceExec` |
I've run into this bug while working on the Vortex `FileSource`, which had a
code path where it will return an updated node with no actual change (for
example - when the filters array is empty), which will result in an actual
change in the plan (For Parquet - that code path doesn't exist but as far as I
can tell that's the main difference)
I'm not 100% sure this is a bug, but it seems like repartitioning small
files is potentially pretty wasteful, this also seems somewhat related to
#4967, which is why I'm filing it.
### To Reproduce
## Ordering and filter
```sql
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS PARQUET \
WITH ORDER (c1 ASC) \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl WHERE c2 > 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: my_tbl projection=[c1, c2]
|
| physical_plan | DataSourceExec: file_groups={1 group:
[[Users/adamgs/code/repartition-repro/my_tbl/VSb9EmxEdPYbb6EJ_0.parquet]]},
projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], file_type=parquet |
| |
|
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
## Ordering, no filter
```sql
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
WITH ORDER (c1 ASC) \
STORED AS PARQUET \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: my_tbl projection=[c1, c2]
|
| physical_plan | DataSourceExec: file_groups={1 group:
[[Users/adamgs/code/repartition-repro/my_tbl/9rqWvTvx7fWJYwEY_0.parquet]]},
projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], file_type=parquet |
| |
|
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
## No ordering, with filter
```sql
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS PARQUET \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl WHERE c2 > 10;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: my_tbl.c2 > Int32(10)
|
| | TableScan: my_tbl projection=[c1, c2],
partial_filters=[my_tbl.c2 > Int32(10)]
|
| physical_plan | CoalesceBatchesExec: target_batch_size=8192
|
| | FilterExec: c2@1 > 10
|
| | RepartitionExec: partitioning=RoundRobinBatch(14),
input_partitions=1
|
| | DataSourceExec: file_groups={1 group:
[[Users/adamgs/code/repartition-repro/my_tbl/uhkDHvDvl1Q3yo5v_0.parquet]]},
projection=[c1, c2], file_type=parquet, predicate=c2@1 > 10,
pruning_predicate=c2_null_count@1 != row_count@2 AND c2_max@0 > 10,
required_guarantees=[] |
| |
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
## No ordering, no filter
```sql
-- create table
CREATE EXTERNAL TABLE my_tbl
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS PARQUET \
LOCATION './my_tbl/';
-- insert some data
INSERT INTO my_tbl VALUES \
('air', 10), ('alabama', 20), ('balloon', 30),\
('kangaroo', 11), ('zebra', 21);
--- explain query
EXPLAIN SELECT * FROM my_tbl;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: my_tbl projection=[c1, c2]
|
| physical_plan | DataSourceExec: file_groups={1 group:
[[Users/adamgs/code/repartition-repro/my_tbl/sQrcFZmANMuthBwu_0.parquet]]},
projection=[c1, c2], file_type=parquet |
| |
|
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+
```
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]