This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 456ceb299d0 [FLINK-27272][table-planner] Fix the incorrect plan for 
query with local sort is incorrect if adaptive batch scheduler is enabled
456ceb299d0 is described below

commit 456ceb299d0601dee283f718f5d3d0a9d108196e
Author: godfreyhe <[email protected]>
AuthorDate: Sun Apr 17 11:50:03 2022 +0800

    [FLINK-27272][table-planner] Fix the incorrect plan for query with local 
sort is incorrect if adaptive batch scheduler is enabled
    
    This closes #19497
---
 .../processor/ForwardHashExchangeProcessor.java    |  50 ++--
 .../nodes/physical/batch/BatchPhysicalRank.scala   |  10 +-
 .../plan/batch/sql/ForwardHashExchangeTest.java    | 101 ++++++-
 .../plan/batch/sql/ForwardHashExchangeTest.xml     | 301 +++++++++++++++++----
 4 files changed, 379 insertions(+), 83 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
index d8238605ec0..9e03ac9da96 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
@@ -87,24 +87,25 @@ public class ForwardHashExchangeProcessor implements 
ExecNodeGraphProcessor {
                                     inputProperty.getRequiredDistribution();
                             ExecEdge edge = node.getInputEdges().get(i);
 
-                            if (requiredDistribution.getType() == 
DistributionType.SINGLETON) {
-                                if (!hasExchangeInput(edge) && 
isInputSortedNode(node)) {
-                                    // if operation chaining is disabled, this 
could mark sure the
-                                    // sort node and its output can also be 
connected by
-                                    // ForwardPartitioner
+                            if (requiredDistribution.getType() != 
DistributionType.HASH) {
+                                boolean visitChild =
+                                        requiredDistribution.getType()
+                                                == DistributionType.SINGLETON;
+                                if (!hasExchangeInput(edge)
+                                        && 
hasSortInputForInputSortedNode(node)) {
                                     ExecEdge newEdge =
                                             addExchangeAndReconnectEdge(
-                                                    tableConfig, edge, 
inputProperty, true);
+                                                    tableConfig,
+                                                    edge,
+                                                    inputProperty,
+                                                    true,
+                                                    visitChild);
                                     newEdges.set(i, newEdge);
                                     changed = true;
                                 }
                                 continue;
                             }
 
-                            if (requiredDistribution.getType() != 
DistributionType.HASH) {
-                                continue;
-                            }
-
                             if (!hasExchangeInput(edge)) {
                                 ExecEdge newEdge;
                                 if (isInputSortedNode(node)) {
@@ -117,7 +118,8 @@ public class ForwardHashExchangeProcessor implements 
ExecNodeGraphProcessor {
                                                         tableConfig,
                                                         
sort.getInputEdges().get(0),
                                                         inputProperty,
-                                                        false);
+                                                        false,
+                                                        true);
                                         sort.setInputEdges(
                                                 
Collections.singletonList(newEdgeOfSort));
                                     }
@@ -127,13 +129,13 @@ public class ForwardHashExchangeProcessor implements 
ExecNodeGraphProcessor {
                                     // ForwardPartitioner
                                     newEdge =
                                             addExchangeAndReconnectEdge(
-                                                    tableConfig, edge, 
inputProperty, true);
+                                                    tableConfig, edge, 
inputProperty, true, true);
                                 } else {
                                     // add Exchange with keep_input_as_is 
distribution as the input
                                     // of the node
                                     newEdge =
                                             addExchangeAndReconnectEdge(
-                                                    tableConfig, edge, 
inputProperty, false);
+                                                    tableConfig, edge, 
inputProperty, false, true);
                                     updateOriginalEdgeInMultipleInput(
                                             node, i, (BatchExecExchange) 
newEdge.getSource());
                                 }
@@ -145,7 +147,7 @@ public class ForwardHashExchangeProcessor implements 
ExecNodeGraphProcessor {
                                 // node and its output can also be connected 
by ForwardPartitioner
                                 ExecEdge newEdge =
                                         addExchangeAndReconnectEdge(
-                                                tableConfig, edge, 
inputProperty, true);
+                                                tableConfig, edge, 
inputProperty, true, true);
                                 newEdges.set(i, newEdge);
                                 changed = true;
                             }
@@ -164,21 +166,27 @@ public class ForwardHashExchangeProcessor implements 
ExecNodeGraphProcessor {
             ReadableConfig tableConfig,
             ExecEdge edge,
             InputProperty inputProperty,
-            boolean strict) {
+            boolean strict,
+            boolean visitChild) {
         ExecNode<?> target = edge.getTarget();
         ExecNode<?> source = edge.getSource();
         if (source instanceof CommonExecExchange) {
             return edge;
         }
         // only Calc, Correlate and Sort can propagate sort property and 
distribution property
-        if (source instanceof BatchExecCalc
-                || source instanceof BatchExecPythonCalc
-                || source instanceof BatchExecSort
-                || source instanceof BatchExecCorrelate
-                || source instanceof BatchExecPythonCorrelate) {
+        if (visitChild
+                && (source instanceof BatchExecCalc
+                        || source instanceof BatchExecPythonCalc
+                        || source instanceof BatchExecSort
+                        || source instanceof BatchExecCorrelate
+                        || source instanceof BatchExecPythonCorrelate)) {
             ExecEdge newEdge =
                     addExchangeAndReconnectEdge(
-                            tableConfig, source.getInputEdges().get(0), 
inputProperty, strict);
+                            tableConfig,
+                            source.getInputEdges().get(0),
+                            inputProperty,
+                            strict,
+                            true);
             source.setInputEdges(Collections.singletonList(newEdge));
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
index 8804b3e67ca..38b01231d86 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
@@ -229,10 +229,14 @@ class BatchPhysicalRank(
   }
 
   override def translateToExecNode(): ExecNode[_] = {
-    val requiredDistribution = if (partitionKey.length() == 0) {
-      InputProperty.SINGLETON_DISTRIBUTION
+    val requiredDistribution = if (isGlobal) {
+      if (partitionKey.length() == 0) {
+        InputProperty.SINGLETON_DISTRIBUTION
+      } else {
+        InputProperty.hashDistribution(partitionKey.toArray)
+      }
     } else {
-      InputProperty.hashDistribution(partitionKey.toArray)
+      InputProperty.UNKNOWN_DISTRIBUTION
     }
     new BatchExecRank(
       unwrapTableConfig(this),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
index fd68753bb42..b5edb3632f9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
@@ -36,6 +36,12 @@ public class ForwardHashExchangeTest extends TableTestBase {
         util = batchTestUtil(TableConfig.getDefault());
 
         util.getStreamEnv().getConfig().setDynamicGraph(true);
+        util.tableEnv()
+                .getConfig()
+                .getConfiguration()
+                .set(
+                        
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+                        false);
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE T (\n"
@@ -71,6 +77,20 @@ public class ForwardHashExchangeTest extends TableTestBase {
                                 + ")");
     }
 
+    @Test
+    public void testRankWithHashShuffle() {
+        util.verifyExecPlan(
+                "SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER 
BY b) rk FROM T) WHERE rk <= 10");
+    }
+
+    @Test
+    public void testSortAggregateWithHashShuffle() {
+        util.tableEnv()
+                .getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"HashAgg");
+        util.verifyExecPlan(" SELECT a, SUM(b) AS b FROM T GROUP BY a");
+    }
+
     @Test
     public void testOverAggOnHashAggWithHashShuffle() {
         util.tableEnv()
@@ -130,12 +150,45 @@ public class ForwardHashExchangeTest extends 
TableTestBase {
     }
 
     @Test
-    public void testSortAggOnSortMergeJoinWithHashShuffle() {
+    public void testOnePhaseSortAggOnSortMergeJoinWithHashShuffle() {
+        util.tableEnv()
+                .getConfig()
+                .set(
+                        ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
+                        "HashJoin,NestedLoopJoin,HashAgg");
+        util.tableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+        util.verifyExecPlan(
+                "WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 
'He%')\n"
+                        + "SELECT sum(b1) FROM r group by a1");
+    }
+
+    @Test
+    public void testTwoPhaseSortAggOnSortMergeJoinWithHashShuffle() {
+        util.tableEnv()
+                .getConfig()
+                .set(
+                        ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
+                        "HashJoin,NestedLoopJoin,HashAgg");
+        util.tableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+        util.verifyExecPlan(
+                "WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 
'He%')\n"
+                        + "SELECT sum(b1) FROM r group by a1");
+    }
+
+    @Test
+    public void testAutoPhaseSortAggOnSortMergeJoinWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
                 .set(
                         ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
                         "HashJoin,NestedLoopJoin,HashAgg");
+        util.tableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "AUTO");
         util.verifyExecPlan(
                 "WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 
'He%')\n"
                         + "SELECT sum(b1) FROM r group by a1");
@@ -196,10 +249,13 @@ public class ForwardHashExchangeTest extends 
TableTestBase {
     }
 
     @Test
-    public void testRankOnSortAggWithHashShuffle() {
+    public void testRankOnOnePhaseSortAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"HashAgg");
+        util.tableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT a, b, RANK() OVER(PARTITION 
BY a ORDER BY b) rk FROM (\n"
@@ -209,10 +265,45 @@ public class ForwardHashExchangeTest extends 
TableTestBase {
     }
 
     @Test
-    public void testRankOnSortAggWithGlobalShuffle() {
+    public void testRankOnTwoPhaseSortAggWithHashShuffle() {
         util.tableEnv()
                 .getConfig()
-                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"SortAgg");
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"HashAgg");
+        util.tableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+        util.verifyExecPlan(
+                "SELECT * FROM (\n"
+                        + "                SELECT a, b, RANK() OVER(PARTITION 
BY a ORDER BY b) rk FROM (\n"
+                        + "                        SELECT a, SUM(b) AS b FROM 
T GROUP BY a\n"
+                        + "                )\n"
+                        + "        ) WHERE rk <= 10");
+    }
+
+    @Test
+    public void testRankOnOnePhaseSortAggWithGlobalShuffle() {
+        util.tableEnv()
+                .getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"HashAgg");
+        util.tableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+        util.verifyExecPlan(
+                "SELECT * FROM (\n"
+                        + "                SELECT b, RANK() OVER(ORDER BY b) 
rk FROM (\n"
+                        + "                        SELECT SUM(b) AS b FROM T\n"
+                        + "                )\n"
+                        + "        ) WHERE rk <= 10");
+    }
+
+    @Test
+    public void testRankOnTwoPhaseSortAggWithGlobalShuffle() {
+        util.tableEnv()
+                .getConfig()
+                .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, 
"HashAgg");
+        util.tableEnv()
+                .getConfig()
+                
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
         util.verifyExecPlan(
                 "SELECT * FROM (\n"
                         + "                SELECT b, RANK() OVER(ORDER BY b) 
rk FROM (\n"
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
index 960e1165e48..898426eb39e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
@@ -16,6 +16,38 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testAutoPhaseSortAggOnSortMergeJoinWithHashShuffle">
+    <Resource name="sql">
+      <![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
+SELECT sum(b1) FROM r group by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)])
+   +- LogicalProject(a1=[$0], b1=[$1])
+      +- LogicalFilter(condition=[AND(=($0, $4), LIKE($2, _UTF-16LE'He%'))])
+         +- LogicalJoin(condition=[true], joinType=[inner])
+            :- LogicalTableScan(table=[[default_catalog, default_database, 
T1]])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
T2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
+   +- Exchange(distribution=[forward])
+      +- Calc(select=[a1, b1])
+         +- Exchange(distribution=[forward])
+            +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 = a2)], 
select=[a1, b1, a2])
+               :- Exchange(distribution=[hash[a1]])
+               :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
+               :     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], 
fields=[a1, b1, c1])
+               +- Exchange(distribution=[hash[a2]])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testHashAggOnHashJoinWithHashShuffle">
     <Resource name="sql">
       <![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
@@ -206,6 +238,38 @@ MultipleInput(readOrder=[1,1,0,0], 
members=[\nUnion(all=[true], union=[b, sd, sy
             :- Exchange(distribution=[keep_input_as_is[hash[a]]])
             :  +- Reused(reference_id=[2])
             +- Reused(reference_id=[3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnePhaseSortAggOnSortMergeJoinWithHashShuffle">
+    <Resource name="sql">
+      <![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
+SELECT sum(b1) FROM r group by a1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)])
+   +- LogicalProject(a1=[$0], b1=[$1])
+      +- LogicalFilter(condition=[AND(=($0, $4), LIKE($2, _UTF-16LE'He%'))])
+         +- LogicalJoin(condition=[true], joinType=[inner])
+            :- LogicalTableScan(table=[[default_catalog, default_database, 
T1]])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
T2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
+   +- Exchange(distribution=[forward])
+      +- Calc(select=[a1, b1])
+         +- Exchange(distribution=[forward])
+            +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 = a2)], 
select=[a1, b1, a2])
+               :- Exchange(distribution=[hash[a1]])
+               :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
+               :     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], 
fields=[a1, b1, c1])
+               +- Exchange(distribution=[hash[a2]])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
 ]]>
     </Resource>
   </TestCase>
@@ -229,7 +293,8 @@ OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 
RANG BETWEEN UNBOUNDED
       +- Exchange(distribution=[forward])
          +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
             +- Exchange(distribution=[single])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[b], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+               +- LocalHashAggregate(select=[Partial_SUM(b) AS sum$0])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[b], metadata=[]]], fields=[b])
 ]]>
     </Resource>
   </TestCase>
@@ -260,7 +325,8 @@ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) 
/ w0$o0) AS avg_b, w1
          +- Exchange(distribution=[keep_input_as_is[hash[c]]])
             +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, 
Final_SUM(sum$0) AS sum_b])
                +- Exchange(distribution=[hash[c]])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[c, b], metadata=[], aggregates=[grouping=[c], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[c, sum$0])
+                  +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(b) 
AS sum$0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[c, b], metadata=[]]], fields=[c, b])
 ]]>
     </Resource>
   </TestCase>
@@ -284,7 +350,8 @@ OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 
RANG BETWEEN UNBOUNDED
       +- Exchange(distribution=[forward])
          +- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
             +- Exchange(distribution=[single])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[b], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+               +- LocalSortAggregate(select=[Partial_SUM(b) AS sum$0])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[b], metadata=[]]], fields=[b])
 ]]>
     </Resource>
   </TestCase>
@@ -315,7 +382,10 @@ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) 
/ w0$o0) AS avg_b, w1
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[c ASC])
                +- Exchange(distribution=[hash[c]])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[c, b], metadata=[], aggregates=[grouping=[c], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[c, sum$0])
+                  +- LocalSortAggregate(groupBy=[c], select=[c, Partial_SUM(b) 
AS sum$0])
+                     +- Exchange(distribution=[forward])
+                        +- Sort(orderBy=[c ASC])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[c, b], metadata=[]]], fields=[c, b])
 ]]>
     </Resource>
   </TestCase>
@@ -345,7 +415,8 @@ Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], 
partitionBy=[], order
       +- Exchange(distribution=[forward])
          +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
             +- Exchange(distribution=[single])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[b], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+               +- LocalHashAggregate(select=[Partial_SUM(b) AS sum$0])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[b], metadata=[]]], fields=[b])
 ]]>
     </Resource>
   </TestCase>
@@ -375,11 +446,12 @@ Rank(rankType=[RANK], rankRange=[rankStart=1, 
rankEnd=10], partitionBy=[a], orde
       +- Exchange(distribution=[keep_input_as_is[hash[a]]])
          +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_SUM(sum$0) AS b])
             +- Exchange(distribution=[hash[a]])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[a, b], metadata=[], aggregates=[grouping=[a], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])
+               +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS 
sum$0])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRankOnSortAggWithGlobalShuffle">
+  <TestCase name="testRankOnOnePhaseSortAggWithGlobalShuffle">
     <Resource name="sql">
       <![CDATA[SELECT * FROM (
                 SELECT b, RANK() OVER(ORDER BY b) rk FROM (
@@ -403,13 +475,13 @@ Rank(rankType=[RANK], rankRange=[rankStart=1, 
rankEnd=10], partitionBy=[], order
 +- Exchange(distribution=[forward])
    +- Sort(orderBy=[b ASC])
       +- Exchange(distribution=[forward])
-         +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
+         +- SortAggregate(isMerge=[false], select=[SUM(b) AS b])
             +- Exchange(distribution=[single])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[b], metadata=[], aggregates=[grouping=[], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[b], metadata=[]]], fields=[b])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRankOnSortAggWithHashShuffle">
+  <TestCase name="testRankOnOnePhaseSortAggWithHashShuffle">
     <Resource name="sql">
       <![CDATA[SELECT * FROM (
                 SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
@@ -433,52 +505,102 @@ Rank(rankType=[RANK], rankRange=[rankStart=1, 
rankEnd=10], partitionBy=[a], orde
 +- Exchange(distribution=[forward])
    +- Sort(orderBy=[a ASC, b ASC])
       +- Exchange(distribution=[keep_input_as_is[hash[a]]])
-         +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_SUM(sum$0) AS b])
-            +- Exchange(distribution=[hash[a]])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[a, b], metadata=[], aggregates=[grouping=[a], 
aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])
+         +- SortAggregate(isMerge=[false], groupBy=[a], select=[a, SUM(b) AS 
b])
+            +- Exchange(distribution=[forward])
+               +- Sort(orderBy=[a ASC])
+                  +- Exchange(distribution=[hash[a]])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSortJoinWithMultipleInputDisabled">
+  <TestCase name="testRankOnTwoPhaseSortAggWithGlobalShuffle">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM
-  (SELECT a FROM T1 JOIN T ON a = a1) t1
-  INNER JOIN
-  (SELECT d2 FROM T JOIN T2 ON d2 = a) t2
-  ON t1.a = t2.d2]]>
+      <![CDATA[SELECT * FROM (
+                SELECT b, RANK() OVER(ORDER BY b) rk FROM (
+                        SELECT SUM(b) AS b FROM T
+                )
+        ) WHERE rk <= 10]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(a=[$0], d2=[$1])
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner])
-   :- LogicalProject(a=[$4])
-   :  +- LogicalJoin(condition=[=($4, $0)], joinType=[inner])
-   :     :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-   :     +- LogicalTableScan(table=[[default_catalog, default_database, T]])
-   +- LogicalProject(d2=[$7])
-      +- LogicalJoin(condition=[=($7, $0)], joinType=[inner])
-         :- LogicalTableScan(table=[[default_catalog, default_database, T]])
-         +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+LogicalProject(b=[$0], rk=[$1])
++- LogicalFilter(condition=[<=($1, 10)])
+   +- LogicalProject(b=[$0], rk=[RANK() OVER (ORDER BY $0 NULLS FIRST)])
+      +- LogicalAggregate(group=[{}], b=[SUM($0)])
+         +- LogicalProject(b=[$1])
+            +- LogicalTableScan(table=[[default_catalog, default_database, T]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-SortMergeJoin(joinType=[InnerJoin], where=[(a = d2)], select=[a, d2])
-:- Exchange(distribution=[keep_input_as_is[hash[a]]])
-:  +- Calc(select=[a])
-:     +- Exchange(distribution=[keep_input_as_is[hash[a1]]])
-:        +- SortMergeJoin(joinType=[InnerJoin], where=[(a = a1)], select=[a1, 
a])
-:           :- Exchange(distribution=[hash[a1]])
-:           :  +- TableSourceScan(table=[[default_catalog, default_database, 
T1, project=[a1], metadata=[]]], fields=[a1])
-:           +- Exchange(distribution=[hash[a]])(reuse_id=[1])
-:              +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[a], metadata=[]]], fields=[a])
-+- Exchange(distribution=[keep_input_as_is[hash[d2]]])
-   +- Calc(select=[d2])
+Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[], 
orderBy=[b ASC], global=[true], select=[b, w0$o0])
++- Exchange(distribution=[forward])
+   +- Sort(orderBy=[b ASC])
+      +- Exchange(distribution=[forward])
+         +- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
+            +- Exchange(distribution=[single])
+               +- LocalSortAggregate(select=[Partial_SUM(b) AS sum$0])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[b], metadata=[]]], fields=[b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRankOnTwoPhaseSortAggWithHashShuffle">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM (
+                SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
+                        SELECT a, SUM(b) AS b FROM T GROUP BY a
+                )
+        ) WHERE rk <= 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], rk=[$2])
++- LogicalFilter(condition=[<=($2, 10)])
+   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $0 ORDER BY 
$1 NULLS FIRST)])
+      +- LogicalAggregate(group=[{0}], b=[SUM($1)])
+         +- LogicalProject(a=[$0], b=[$1])
+            +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], 
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
++- Exchange(distribution=[forward])
+   +- Sort(orderBy=[a ASC, b ASC])
       +- Exchange(distribution=[keep_input_as_is[hash[a]]])
-         +- SortMergeJoin(joinType=[InnerJoin], where=[(d2 = a)], select=[a, 
d2])
-            :- Reused(reference_id=[1])
-            +- Exchange(distribution=[hash[d2]])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
T2, project=[d2], metadata=[]]], fields=[d2])
+         +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_SUM(sum$0) AS b])
+            +- Exchange(distribution=[forward])
+               +- Sort(orderBy=[a ASC])
+                  +- Exchange(distribution=[hash[a]])
+                     +- LocalSortAggregate(groupBy=[a], select=[a, 
Partial_SUM(b) AS sum$0])
+                        +- Exchange(distribution=[forward])
+                           +- Sort(orderBy=[a ASC])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRankWithHashShuffle">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY 
b) rk FROM T) WHERE rk <= 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], rk=[$2])
++- LogicalFilter(condition=[<=($2, 10)])
+   +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $0 ORDER BY 
$1 NULLS FIRST)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], 
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
++- Exchange(distribution=[forward])
+   +- Sort(orderBy=[a ASC, b ASC])
+      +- Exchange(distribution=[hash[a]])
+         +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], 
partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b])
+            +- Exchange(distribution=[forward])
+               +- Sort(orderBy=[a ASC, b ASC])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
@@ -509,7 +631,31 @@ SortAggregate(isMerge=[false], select=[SUM(b1) AS EXPR$0])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSortAggOnSortMergeJoinWithHashShuffle">
+  <TestCase name="testSortAggregateWithHashShuffle">
+    <Resource name="sql">
+      <![CDATA[ SELECT a, SUM(b) AS b FROM T GROUP BY a]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalAggregate(group=[{0}], b=[SUM($1)])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS b])
++- Exchange(distribution=[forward])
+   +- Sort(orderBy=[a ASC])
+      +- Exchange(distribution=[hash[a]])
+         +- LocalSortAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS 
sum$0])
+            +- Exchange(distribution=[forward])
+               +- Sort(orderBy=[a ASC])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTwoPhaseSortAggOnSortMergeJoinWithHashShuffle">
     <Resource name="sql">
       <![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
 SELECT sum(b1) FROM r group by a1]]>
@@ -528,16 +674,63 @@ LogicalProject(EXPR$0=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[EXPR$0])
-+- SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
++- SortAggregate(isMerge=[true], groupBy=[a1], select=[a1, Final_SUM(sum$0) AS 
EXPR$0])
    +- Exchange(distribution=[forward])
-      +- Calc(select=[a1, b1])
-         +- Exchange(distribution=[forward])
-            +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 = a2)], 
select=[a1, b1, a2])
-               :- Exchange(distribution=[hash[a1]])
-               :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
-               :     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], 
fields=[a1, b1, c1])
-               +- Exchange(distribution=[hash[a2]])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
+      +- Sort(orderBy=[a1 ASC])
+         +- Exchange(distribution=[hash[a1]])
+            +- LocalSortAggregate(groupBy=[a1], select=[a1, Partial_SUM(b1) AS 
sum$0])
+               +- Exchange(distribution=[forward])
+                  +- Sort(orderBy=[a1 ASC])
+                     +- Calc(select=[a1, b1])
+                        +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 = 
a2)], select=[a1, b1, a2])
+                           :- Exchange(distribution=[hash[a1]])
+                           :  +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
+                           :     +- TableSourceScan(table=[[default_catalog, 
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]], 
fields=[a1, b1, c1])
+                           +- Exchange(distribution=[hash[a2]])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSortJoinWithMultipleInputDisabled">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM
+  (SELECT a FROM T1 JOIN T ON a = a1) t1
+  INNER JOIN
+  (SELECT d2 FROM T JOIN T2 ON d2 = a) t2
+  ON t1.a = t2.d2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], d2=[$1])
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner])
+   :- LogicalProject(a=[$4])
+   :  +- LogicalJoin(condition=[=($4, $0)], joinType=[inner])
+   :     :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+   +- LogicalProject(d2=[$7])
+      +- LogicalJoin(condition=[=($7, $0)], joinType=[inner])
+         :- LogicalTableScan(table=[[default_catalog, default_database, T]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+SortMergeJoin(joinType=[InnerJoin], where=[(a = d2)], select=[a, d2])
+:- Exchange(distribution=[keep_input_as_is[hash[a]]])
+:  +- Calc(select=[a])
+:     +- Exchange(distribution=[keep_input_as_is[hash[a1]]])
+:        +- SortMergeJoin(joinType=[InnerJoin], where=[(a = a1)], select=[a1, 
a])
+:           :- Exchange(distribution=[hash[a1]])
+:           :  +- TableSourceScan(table=[[default_catalog, default_database, 
T1, project=[a1], metadata=[]]], fields=[a1])
+:           +- Exchange(distribution=[hash[a]])(reuse_id=[1])
+:              +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[a], metadata=[]]], fields=[a])
++- Exchange(distribution=[keep_input_as_is[hash[d2]]])
+   +- Calc(select=[d2])
+      +- Exchange(distribution=[keep_input_as_is[hash[a]]])
+         +- SortMergeJoin(joinType=[InnerJoin], where=[(d2 = a)], select=[a, 
d2])
+            :- Reused(reference_id=[1])
+            +- Exchange(distribution=[hash[d2]])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
T2, project=[d2], metadata=[]]], fields=[d2])
 ]]>
     </Resource>
   </TestCase>

Reply via email to