2010YOUY01 commented on issue #6983:
URL:
https://github.com/apache/arrow-datafusion/issues/6983#issuecomment-1661281753
@gobraves Thank you for trying! I also took a look at this issue (and find
it pretty difficult to solve 😨 ), hope the following info might be helpful:
Here is an overview of parallel parquet scan
```rust
let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();
let _cached = _df.cache().await;
```
After `_ctx.read_parquet(...`, a `LogicalPlan` with `TableScan` is created
and stored inside the dataframe.
Inside `_df.cache()`, the `LogicalPlan` will first be converted into a
physical plan with `ParquetExec` node, and then the physical optimizer will try
to modify the `ParquetExec` node's `file_groups` to make it parallel.
My reproducer:
```
DataFusion CLI v28.0.0
❯ create external table test stored as parquet location
'/Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet';
0 rows in set. Query took 0.064 seconds.
❯ create table t as select * from test;
0 rows in set. Query took 16.364 seconds.
❯ create table t as (select * from test where l_orderkey > 0);
0 rows in set. Query took 3.646 seconds.
```
```
❯ explain select * from test;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: test projection=[l_orderkey, l_partkey,
l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax,
l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate,
l_shipinstruct, l_shipmode, l_comment]
|
| physical_plan | ParquetExec: file_groups={1 group:
[[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet]]},
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment] |
| |
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.009 seconds.
❯ explain select * from test where l_orderkey > 0;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: test.l_orderkey > Int64(0)
|
| | TableScan: test projection=[l_orderkey, l_partkey,
l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax,
l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate,
l_shipinstruct, l_shipmode, l_comment], partial_filters=[test.l_orderkey >
Int64(0)]
|
| physical_plan | CoalesceBatchesExec: target_batch_size=8192
|
| | FilterExec: l_orderkey@0 > 0
|
| | ParquetExec: file_groups={12 groups:
[[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..13271296],
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:13271296..26542592],
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:26542592..39813888],
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:39813888..53085184],
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:53085184..66356480],
...]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment],
predicate=l_orderkey@0 > 0, pruning_predicate=l_orderkey_max@0 > 0 |
| |
|
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
The 2nd one is parallelized, `explain verbose select ...` can be used to see
the specific physical optimizer rule to repartition the `ParquetExec`
https://github.com/apache/arrow-datafusion/blob/a9561a0f06c25f370dc39df08d057db85c4e0c7a/datafusion/core/src/physical_optimizer/repartition.rs#L166
I think there might be some bug inside this function, if
`parquet_exec.get_repartitioned()` inside it gets called, then the
`ParquetExec` should be properly parallelized
This reproducer should have the same root cause as the original one, for the
original reproducer, adding a filter to `_df` can also get it parallelized:
```rust
let _df = _ctx
.read_parquet(FILENAME, _read_options)
.await
.unwrap()
.filter(col("l_orderkey").gt(lit(0)))
.unwrap();
// Then can be parallelized
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]