mustafasrepo commented on PR #5171: URL: https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421006216
> > > > @alamb @ozankabak @mustafasrepo > > > > Regarding the global sort replaced to a parallel version(SortPreservingMergeExec + Local Sort) optimization, I think there is already a rule `GlobalSortSelection` for the exact purpose. I think we should not let the Sort Enforcement rule to handle this again. Implement/enhance such optimization in the `GlobalSortSelection` rule is more straightforward and do not need to care the positions of the `CoalescePartitionsExec`. > > > > > > > > > I am not sure how we can do all the local sort + merge substitutions just with `GlobalSortSelection`, which doesn't track coalesce operations on partitions as you rightly point out. Note that we handle (and parallel-optimize) not just top level sorts, but sorts at any depth within the plan, even with intermediate executors in between the coalesce operation and the sort in question. > > > We will take a deeper look today and see if we can move over the logic to `GlobalSortSelection` while still preserving the same functionality. If we can, great -- if not, we will share an example that blocks this. Thank you for the suggestion 👍 > > > > > > Yes, please take a look at the `GlobalSortSelection` rule. This rule does not need to care about the position of `CoalescePartitionsExec` because `CoalescePartitionsExec`s are added by `EnforceDistribution` rule which is triggered after the `GlobalSortSelection` rule. The physical Sort Selection should happen in a very early stage of the physical optimization phase. I guess why the current `GlobalSortSelection` does not optimize all the `Global Sort` is because it is not that aggressive and has an additional check. If you comment that check, all the `Global Sort` should be replaced. > > `&& sort_exec.fetch().is_some()` > > @mingmwang your suggestion works. This greatly simplifies the code. Thanks for the suggestion. By the way I didn't remove `&& sort_exec.fetch().is_some()` check directly. I `OR`ed this check with the config option (can be found [here](https://github.com/synnada-ai/arrow-datafusion/blob/6db42485b016ef0acb183cb70e391bfc6910d4f9/datafusion/core/src/physical_optimizer/global_sort_selection.rs#L59)). In case, one wants to toggle this feature. As you, and @alamb say in some contexts this may not be what users want. By the way, after thinking about this change. I found out that new approach doesn't always accomplish parallelization. Consider query below ```sql SELECT c1, \ SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) as sum1, \ SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 \ FROM aggregate_test_100 ORDER BY c1 ASC ``` Its physical plan, is as follows in the current version ```sql "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]", " SortExec: [c1@0 ASC NULLS LAST]", " ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]", " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", " SortExec: [c9@1 ASC NULLS LAST]", " CoalescePartitionsExec", " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]", " SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", ``` However, previously rule would produce the plan below ```sql "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]", " SortExec: [c1@0 ASC NULLS LAST]", " ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]", " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]", " SortPreservingMergeExec: [c9@1 ASC NULLS LAST]", " SortExec: [c9@1 ASC NULLS LAST]", " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) }]", " SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2), input_partitions=2", " RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1", ``` tomorrow, I will retract the changes. And will add this example as a test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org