gene-bordegaray commented on PR #18521: URL: https://github.com/apache/datafusion/pull/18521#issuecomment-3506597342
> Thank you for the amazing work. > > Removing unnecessary consecutive `RepartitionExec` makes sense to me, however I have went through half of the test and found 2 plan changes that is not `2 RepartitionExec -> 1 RepartitionExec`, I'm wondering is that expected? Do we have other plan changes that is not removing 1 of the consecutive `RepartitionExec`? > > The first one see review comment. > > The second one is tpch-q4, I saw a 20% speedup in benchmark result, so I checked the query plan, and the difference is it's removing a round robin repartition above parquet reader: > > ``` > // before > > explain select > o_orderpriority, > count(*) as order_count > from > orders > where > o_orderdate >= '1993-07-01' > and o_orderdate < date '1993-07-01' + interval '3' month > and exists ( > select > * > from > lineitem > where > l_orderkey = o_orderkey > and l_commitdate < l_receiptdate > ) > group by > o_orderpriority > order by > o_orderpriority; > +---------------+------------------------------------------------------------+ > | plan_type | plan | > +---------------+------------------------------------------------------------+ > | physical_plan | ┌───────────────────────────┐ | > | | │ SortPreservingMergeExec │ | > | | │ -------------------- │ | > | | │ o_orderpriority ASC NULLS │ | > | | │ LAST │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ SortExec │ | > | | │ -------------------- │ | > | | │ o_orderpriority@0 ASC │ | > | | │ NULLS LAST │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ ProjectionExec │ | > | | │ -------------------- │ | > | | │ o_orderpriority: │ | > | | │ o_orderpriority │ | > | | │ │ | > | | │ order_count: │ | > | | │ count(Int64(1)) │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ AggregateExec │ | > | | │ -------------------- │ | > | | │ aggr: count(1) │ | > | | │ │ | > | | │ group_by: │ | > | | │ o_orderpriority │ | > | | │ │ | > | | │ mode: │ | > | | │ FinalPartitioned │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ CoalesceBatchesExec │ | > | | │ -------------------- │ | > | | │ target_batch_size: │ | > | | │ 8192 │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ RepartitionExec │ | > | | │ -------------------- │ | > | | │ partition_count(in->out): │ | > | | │ 14 -> 14 │ | > | | │ │ | > | | │ partitioning_scheme: │ | > | | │ Hash([o_orderpriority@0], │ | > | | │ 14) │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ AggregateExec │ | > | | │ -------------------- │ | > | | │ aggr: count(1) │ | > | | │ │ | > | | │ group_by: │ | > | | │ o_orderpriority │ | > | | │ │ | > | | │ mode: Partial │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ CoalesceBatchesExec │ | > | | │ -------------------- │ | > | | │ target_batch_size: │ | > | | │ 8192 │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ HashJoinExec │ | > | | │ -------------------- │ | > | | │ join_type: RightSemi │ | > | | │ ├──────────────┐ | > | | │ on: │ │ | > | | │ (l_orderkey = o_orderkey) │ │ | > | | └─────────────┬─────────────┘ │ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ CoalescePartitionsExec ││ CoalesceBatchesExec │ | > | | │ ││ -------------------- │ | > | | │ ││ target_batch_size: │ | > | | │ ││ 8192 │ | > | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ CoalesceBatchesExec ││ FilterExec │ | > | | │ -------------------- ││ -------------------- │ | > | | │ target_batch_size: ││ predicate: │ | > | | │ 8192 ││ o_orderdate >= 1993-07-01 │ | > | | │ ││ AND o_orderdate < 1993 │ | > | | │ ││ -10-01 │ | > | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ FilterExec ││ RepartitionExec │ | > | | │ -------------------- ││ -------------------- │ | > | | │ predicate: ││ partition_count(in->out): │ | > | | │ l_receiptdate > ││ 1 -> 14 │ | > | | │ l_commitdate ││ │ | > | | │ ││ partitioning_scheme: │ | > | | │ ││ RoundRobinBatch(14) │ | > | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ RepartitionExec ││ DataSourceExec │ | > | | │ -------------------- ││ -------------------- │ | > | | │ partition_count(in->out): ││ files: 1 │ | > | | │ 1 -> 14 ││ format: parquet │ | > | | │ ││ │ | > | | │ partitioning_scheme: ││ predicate: │ | > | | │ RoundRobinBatch(14) ││ o_orderdate >= 1993-07-01 │ | > | | │ ││ AND o_orderdate < 1993 │ | > | | │ ││ -10-01 │ | > | | └─────────────┬─────────────┘└───────────────────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ DataSourceExec │ | > | | │ -------------------- │ | > | | │ files: 1 │ | > | | │ format: parquet │ | > | | │ │ | > | | │ predicate: │ | > | | │ l_receiptdate > │ | > | | │ l_commitdate │ | > | | └───────────────────────────┘ | > | | | > +---------------+------------------------------------------------------------+ > 1 row(s) fetched. > Elapsed 0.011 seconds. > > // PR > +---------------+------------------------------------------------------------+ > | plan_type | plan | > +---------------+------------------------------------------------------------+ > | physical_plan | ┌───────────────────────────┐ | > | | │ SortPreservingMergeExec │ | > | | │ -------------------- │ | > | | │ o_orderpriority ASC NULLS │ | > | | │ LAST │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ SortExec │ | > | | │ -------------------- │ | > | | │ o_orderpriority@0 ASC │ | > | | │ NULLS LAST │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ ProjectionExec │ | > | | │ -------------------- │ | > | | │ o_orderpriority: │ | > | | │ o_orderpriority │ | > | | │ │ | > | | │ order_count: │ | > | | │ count(Int64(1)) │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ AggregateExec │ | > | | │ -------------------- │ | > | | │ aggr: count(1) │ | > | | │ │ | > | | │ group_by: │ | > | | │ o_orderpriority │ | > | | │ │ | > | | │ mode: │ | > | | │ FinalPartitioned │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ CoalesceBatchesExec │ | > | | │ -------------------- │ | > | | │ target_batch_size: │ | > | | │ 8192 │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ RepartitionExec │ | > | | │ -------------------- │ | > | | │ partition_count(in->out): │ | > | | │ 14 -> 14 │ | > | | │ │ | > | | │ partitioning_scheme: │ | > | | │ Hash([o_orderpriority@0], │ | > | | │ 14) │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ AggregateExec │ | > | | │ -------------------- │ | > | | │ aggr: count(1) │ | > | | │ │ | > | | │ group_by: │ | > | | │ o_orderpriority │ | > | | │ │ | > | | │ mode: Partial │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ CoalesceBatchesExec │ | > | | │ -------------------- │ | > | | │ target_batch_size: │ | > | | │ 8192 │ | > | | └─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐ | > | | │ HashJoinExec │ | > | | │ -------------------- │ | > | | │ join_type: RightSemi │ | > | | │ ├──────────────┐ | > | | │ on: │ │ | > | | │ (l_orderkey = o_orderkey) │ │ | > | | └─────────────┬─────────────┘ │ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ CoalescePartitionsExec ││ CoalesceBatchesExec │ | > | | │ ││ -------------------- │ | > | | │ ││ target_batch_size: │ | > | | │ ││ 8192 │ | > | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ CoalesceBatchesExec ││ FilterExec │ | > | | │ -------------------- ││ -------------------- │ | > | | │ target_batch_size: ││ predicate: │ | > | | │ 8192 ││ o_orderdate >= 1993-07-01 │ | > | | │ ││ AND o_orderdate < 1993 │ | > | | │ ││ -10-01 │ | > | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ FilterExec ││ RepartitionExec │ | > | | │ -------------------- ││ -------------------- │ | > | | │ predicate: ││ partition_count(in->out): │ | > | | │ l_receiptdate > ││ 1 -> 14 │ | > | | │ l_commitdate ││ │ | > | | │ ││ partitioning_scheme: │ | > | | │ ││ RoundRobinBatch(14) │ | > | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | > | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | > | | │ DataSourceExec ││ DataSourceExec │ | > | | │ -------------------- ││ -------------------- │ | > | | │ files: 14 ││ files: 1 │ | > | | │ format: parquet ││ format: parquet │ | > | | │ ││ │ | > | | │ predicate: ││ predicate: │ | > | | │ l_receiptdate > ││ o_orderdate >= 1993-07-01 │ | > | | │ l_commitdate ││ AND o_orderdate < 1993 │ | > | | │ ││ -10-01 │ | > | | └───────────────────────────┘└───────────────────────────┘ | > | | | > +---------------+------------------------------------------------------------+ > 1 row(s) fetched. > Elapsed 0.013 seconds. > ``` Can you calrify the data you used to create the tables, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
