mingmwang commented on PR #5171:
URL: 
https://github.com/apache/arrow-datafusion/pull/5171#issuecomment-1421115470

   
   > 
   > By the way, after thinking about this change. I found out that 
`GlobalSortSelection` approach doesn't always accomplish parallelization. 
Consider query below
   > 
   > ```sql
   > SELECT c1, \
   >     SUM(C9) OVER (PARTITION BY C1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 3 FOLLOWING) as sum1, \
   >     SUM(C9) OVER (ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING) as sum2 \
   >     FROM aggregate_test_100 ORDER BY c1 ASC
   > ```
   > 
   > Its physical plan, is as follows in the current version
   > 
   > ```sql
   > "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   > "  SortExec: [c1@0 ASC NULLS LAST]",
   > "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
   > "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
   > "        SortExec: [c9@1 ASC NULLS LAST]",
   > "          CoalescePartitionsExec",
   > "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
   > "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   > "                CoalesceBatchesExec: target_batch_size=8192",
   > "                  RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
   > "                    RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
   > ```
   > 
   > However, previously rule would produce the plan below
   > 
   > ```sql
   > "SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
   > "  SortExec: [c1@0 ASC NULLS LAST]",
   > "    ProjectionExec: expr=[c1@0 as c1, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING@2 as sum1, 
SUM(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2]",
   > "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) 
}]",
   > "        SortPreservingMergeExec: [c9@1 ASC NULLS LAST]",
   > "          SortExec: [c9@1 ASC NULLS LAST]",
   > "            BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): 
Ok(Field { name: \"SUM(aggregate_test_100.c9)\", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(3)) 
}]",
   > "              SortExec: [c1@0 ASC NULLS LAST,c9@1 ASC NULLS LAST]",
   > "                CoalesceBatchesExec: target_batch_size=8192",
   > "                  RepartitionExec: partitioning=Hash([Column { name: 
\"c1\", index: 0 }], 2), input_partitions=2",
   > "                    RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
   > ```
   > 
   > tomorrow, I will go back to first approach. And will add this example as a 
test.
   
   I think this is because that global sort  + `CoalescePartitionsExec` were 
added later by the two enforcement rules.
   An easy way to get ride from this is to run the `GlobalSortSelection` rule 
again after the two enforcement rules. I would prefer still let the 
`GlobalSortSelection` rule handle this optimization. Need to be enhance 
`GlobalSortSelection` rule to handle the  SortExec + CoalescePartitionsExec 
combination.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to