mingmwang commented on PR #5171: URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421115470
> > By the way, after thinking about this change. I found out that `GlobalSortSelection` approach doesn't always accomplish parallelization. Consider query below > > ```sql > SELECT c1, \ > SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \ > SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \ > FROM aggregate_test_100 ORDER BY c1 ASC > ``` > > Its physical plan, is as follows in the current version > > ```sql > "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]", > " SortExec: [c1@0 ASC NULLS LAST]", > " ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]", > " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", > " SortExec: [c9@1 ASC NULLS LAST]", > " CoalescePartitionsExec", > " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]", > " SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]", > " CoalesceBatchesExec: target_batch_size=8192", > " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", > " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", > ``` > > However, previously rule would produce the plan below > > ```sql > "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]", > " SortExec: [c1@0 ASC NULLS LAST]", > " ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]", > " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", > " SortPreservingMergeExec: [c9@1 ASC NULLS LAST]", > " SortExec: [c9@1 ASC NULLS LAST]", > " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]", > " SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]", > " CoalesceBatchesExec: target_batch_size=8192", > " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", > " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", > ``` > > tomorrow, I will go back to first approach. And will add this example as a test. I think this is because that global sort + `CoalescePartitionsExec` were added later by the two enforcement rules. An easy way to get ride from this is to run the `GlobalSortSelection` rule again after the two enforcement rules. I would prefer still let the `GlobalSortSelection` rule handle this optimization. Need to be enhance `GlobalSortSelection` rule to handle the SortExec + CoalescePartitionsExec combination. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org