mustafasrepo commented on PR #5171:
URL: 
https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421006216

   > > > > @alamb @ozankabak @mustafasrepo
   > > > > Regarding the global sort replaced to a parallel 
version(SortPreservingMergeExec + Local Sort) optimization, I think there is 
already a rule `GlobalSortSelection` for the exact purpose. I think we should 
not let the Sort Enforcement rule to handle this again. Implement/enhance such 
optimization in the `GlobalSortSelection` rule is more straightforward and do 
not need to care the positions of the `CoalescePartitionsExec`.
   > > > 
   > > > 
   > > > I am not sure how we can do all the local sort + merge substitutions 
just with `GlobalSortSelection`, which doesn't track coalesce operations on 
partitions as you rightly point out. Note that we handle (and 
parallel-optimize) not just top level sorts, but sorts at any depth within the 
plan, even with intermediate executors in between the coalesce operation and 
the sort in question.
   > > > We will take a deeper look today and see if we can move over the logic 
to `GlobalSortSelection` while still preserving the same functionality. If we 
can, great -- if not, we will share an example that blocks this. Thank you for 
the suggestion 👍
   > > 
   > > 
   > > Yes, please take a look at the `GlobalSortSelection` rule. This rule 
does not need to care about the position of `CoalescePartitionsExec` because 
`CoalescePartitionsExec`s are added by `EnforceDistribution` rule which is 
triggered after the `GlobalSortSelection` rule. The physical Sort Selection 
should happen in a very early stage of the physical optimization phase. I guess 
why the current `GlobalSortSelection` does not optimize all the `Global Sort` 
is because it is not that aggressive and has an additional check. If you 
comment that check, all the `Global Sort` should be replaced.
   > > `&& sort_exec.fetch().is_some()`
   > 
   > @mingmwang your suggestion works. This greatly simplifies the code. Thanks 
for the suggestion. By the way I didn't remove `&& sort_exec.fetch().is_some()` 
check directly. I `OR`ed this check with the config option (can be found 
[here](https://github.com/synnada-ai/arrow-datafusion/blob/6db42485b016ef0acb183cb70e391bfc6910d4f9/datafusion/core/src/physical_optimizer/global_sort_selection.rs#L59)).
 In case, one wants to toggle this feature. As you, and @alamb say in some 
contexts this may not be what users want.
   
   By the way, after thinking about this change. I found out that new approach 
doesn't always accomplish parallelization. Consider query below
   ```sql
   SELECT c1, \
       SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 3 FOLLOWING) as sum1, \
       SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) 
as sum2 \
       FROM aggregate_test_100 ORDER BY c1 ASC
   ```
   Its physical plan, is as follows in the current version
   ```sql
   "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   "  SortExec: [c1@0 ASC NULLS LAST]",
   "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@3 as sum2]",
   "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
   "        SortExec: [c9@1 ASC NULLS LAST]",
   "          CoalescePartitionsExec",
   "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field 
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
   "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   "                CoalesceBatchesExec: target_batch_size=8192",
   "                  RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
   "                    RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
   ```
   However, previously rule would produce the plan below
   ```sql
   "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   "  SortExec: [c1@0 ASC NULLS LAST]",
   "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, SUM(aggregate_test_100.c9) ORDER 
BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@3 as sum2]",
   "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
   "        SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
   "          SortExec: [c9@1 ASC NULLS LAST]",
   "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field 
{ name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
   "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   "                CoalesceBatchesExec: target_batch_size=8192",
   "                  RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
   "                    RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
   ```
   tomorrow, I will retract the changes. And will add this example as a test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to