This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 1556c3c7553 [FLINK-27272][table-planner] Fix the incorrect plan for
query with local sort is incorrect if adaptive batch scheduler is enabled
1556c3c7553 is described below
commit 1556c3c75535cbb653e4bd8251cb4f0b3ca2e8f0
Author: godfreyhe <[email protected]>
AuthorDate: Mon Apr 18 10:27:57 2022 +0800
[FLINK-27272][table-planner] Fix the incorrect plan for query with local
sort is incorrect if adaptive batch scheduler is enabled
(cherry picked from commit 456ceb299d0601dee283f718f5d3d0a9d108196e)
This closes #19497
---
.../processor/ForwardHashExchangeProcessor.java | 50 ++--
.../nodes/physical/batch/BatchPhysicalRank.scala | 10 +-
.../plan/batch/sql/ForwardHashExchangeTest.java | 101 ++++++-
.../plan/batch/sql/ForwardHashExchangeTest.xml | 301 +++++++++++++++++----
4 files changed, 379 insertions(+), 83 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
index d8238605ec0..9e03ac9da96 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/ForwardHashExchangeProcessor.java
@@ -87,24 +87,25 @@ public class ForwardHashExchangeProcessor implements
ExecNodeGraphProcessor {
inputProperty.getRequiredDistribution();
ExecEdge edge = node.getInputEdges().get(i);
- if (requiredDistribution.getType() ==
DistributionType.SINGLETON) {
- if (!hasExchangeInput(edge) &&
isInputSortedNode(node)) {
- // if operation chaining is disabled, this
could mark sure the
- // sort node and its output can also be
connected by
- // ForwardPartitioner
+ if (requiredDistribution.getType() !=
DistributionType.HASH) {
+ boolean visitChild =
+ requiredDistribution.getType()
+ == DistributionType.SINGLETON;
+ if (!hasExchangeInput(edge)
+ &&
hasSortInputForInputSortedNode(node)) {
ExecEdge newEdge =
addExchangeAndReconnectEdge(
- tableConfig, edge,
inputProperty, true);
+ tableConfig,
+ edge,
+ inputProperty,
+ true,
+ visitChild);
newEdges.set(i, newEdge);
changed = true;
}
continue;
}
- if (requiredDistribution.getType() !=
DistributionType.HASH) {
- continue;
- }
-
if (!hasExchangeInput(edge)) {
ExecEdge newEdge;
if (isInputSortedNode(node)) {
@@ -117,7 +118,8 @@ public class ForwardHashExchangeProcessor implements
ExecNodeGraphProcessor {
tableConfig,
sort.getInputEdges().get(0),
inputProperty,
- false);
+ false,
+ true);
sort.setInputEdges(
Collections.singletonList(newEdgeOfSort));
}
@@ -127,13 +129,13 @@ public class ForwardHashExchangeProcessor implements
ExecNodeGraphProcessor {
// ForwardPartitioner
newEdge =
addExchangeAndReconnectEdge(
- tableConfig, edge,
inputProperty, true);
+ tableConfig, edge,
inputProperty, true, true);
} else {
// add Exchange with keep_input_as_is
distribution as the input
// of the node
newEdge =
addExchangeAndReconnectEdge(
- tableConfig, edge,
inputProperty, false);
+ tableConfig, edge,
inputProperty, false, true);
updateOriginalEdgeInMultipleInput(
node, i, (BatchExecExchange)
newEdge.getSource());
}
@@ -145,7 +147,7 @@ public class ForwardHashExchangeProcessor implements
ExecNodeGraphProcessor {
// node and its output can also be connected
by ForwardPartitioner
ExecEdge newEdge =
addExchangeAndReconnectEdge(
- tableConfig, edge,
inputProperty, true);
+ tableConfig, edge,
inputProperty, true, true);
newEdges.set(i, newEdge);
changed = true;
}
@@ -164,21 +166,27 @@ public class ForwardHashExchangeProcessor implements
ExecNodeGraphProcessor {
ReadableConfig tableConfig,
ExecEdge edge,
InputProperty inputProperty,
- boolean strict) {
+ boolean strict,
+ boolean visitChild) {
ExecNode<?> target = edge.getTarget();
ExecNode<?> source = edge.getSource();
if (source instanceof CommonExecExchange) {
return edge;
}
// only Calc, Correlate and Sort can propagate sort property and
distribution property
- if (source instanceof BatchExecCalc
- || source instanceof BatchExecPythonCalc
- || source instanceof BatchExecSort
- || source instanceof BatchExecCorrelate
- || source instanceof BatchExecPythonCorrelate) {
+ if (visitChild
+ && (source instanceof BatchExecCalc
+ || source instanceof BatchExecPythonCalc
+ || source instanceof BatchExecSort
+ || source instanceof BatchExecCorrelate
+ || source instanceof BatchExecPythonCorrelate)) {
ExecEdge newEdge =
addExchangeAndReconnectEdge(
- tableConfig, source.getInputEdges().get(0),
inputProperty, strict);
+ tableConfig,
+ source.getInputEdges().get(0),
+ inputProperty,
+ strict,
+ true);
source.setInputEdges(Collections.singletonList(newEdge));
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
index 1d3c5720148..18626e783dc 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalRank.scala
@@ -229,10 +229,14 @@ class BatchPhysicalRank(
}
override def translateToExecNode(): ExecNode[_] = {
- val requiredDistribution = if (partitionKey.length() == 0) {
- InputProperty.SINGLETON_DISTRIBUTION
+ val requiredDistribution = if (isGlobal) {
+ if (partitionKey.length() == 0) {
+ InputProperty.SINGLETON_DISTRIBUTION
+ } else {
+ InputProperty.hashDistribution(partitionKey.toArray)
+ }
} else {
- InputProperty.hashDistribution(partitionKey.toArray)
+ InputProperty.UNKNOWN_DISTRIBUTION
}
new BatchExecRank(
unwrapTableConfig(this),
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
index fd68753bb42..b5edb3632f9 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.java
@@ -36,6 +36,12 @@ public class ForwardHashExchangeTest extends TableTestBase {
util = batchTestUtil(TableConfig.getDefault());
util.getStreamEnv().getConfig().setDynamicGraph(true);
+ util.tableEnv()
+ .getConfig()
+ .getConfiguration()
+ .set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_AGGREGATE_PUSHDOWN_ENABLED,
+ false);
util.tableEnv()
.executeSql(
"CREATE TABLE T (\n"
@@ -71,6 +77,20 @@ public class ForwardHashExchangeTest extends TableTestBase {
+ ")");
}
+ @Test
+ public void testRankWithHashShuffle() {
+ util.verifyExecPlan(
+ "SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER
BY b) rk FROM T) WHERE rk <= 10");
+ }
+
+ @Test
+ public void testSortAggregateWithHashShuffle() {
+ util.tableEnv()
+ .getConfig()
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"HashAgg");
+ util.verifyExecPlan(" SELECT a, SUM(b) AS b FROM T GROUP BY a");
+ }
+
@Test
public void testOverAggOnHashAggWithHashShuffle() {
util.tableEnv()
@@ -130,12 +150,45 @@ public class ForwardHashExchangeTest extends
TableTestBase {
}
@Test
- public void testSortAggOnSortMergeJoinWithHashShuffle() {
+ public void testOnePhaseSortAggOnSortMergeJoinWithHashShuffle() {
+ util.tableEnv()
+ .getConfig()
+ .set(
+ ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
+ "HashJoin,NestedLoopJoin,HashAgg");
+ util.tableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+ util.verifyExecPlan(
+ "WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE
'He%')\n"
+ + "SELECT sum(b1) FROM r group by a1");
+ }
+
+ @Test
+ public void testTwoPhaseSortAggOnSortMergeJoinWithHashShuffle() {
+ util.tableEnv()
+ .getConfig()
+ .set(
+ ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
+ "HashJoin,NestedLoopJoin,HashAgg");
+ util.tableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+ util.verifyExecPlan(
+ "WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE
'He%')\n"
+ + "SELECT sum(b1) FROM r group by a1");
+ }
+
+ @Test
+ public void testAutoPhaseSortAggOnSortMergeJoinWithHashShuffle() {
util.tableEnv()
.getConfig()
.set(
ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"HashJoin,NestedLoopJoin,HashAgg");
+ util.tableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "AUTO");
util.verifyExecPlan(
"WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE
'He%')\n"
+ "SELECT sum(b1) FROM r group by a1");
@@ -196,10 +249,13 @@ public class ForwardHashExchangeTest extends
TableTestBase {
}
@Test
- public void testRankOnSortAggWithHashShuffle() {
+ public void testRankOnOnePhaseSortAggWithHashShuffle() {
util.tableEnv()
.getConfig()
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"SortAgg");
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"HashAgg");
+ util.tableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
util.verifyExecPlan(
"SELECT * FROM (\n"
+ " SELECT a, b, RANK() OVER(PARTITION
BY a ORDER BY b) rk FROM (\n"
@@ -209,10 +265,45 @@ public class ForwardHashExchangeTest extends
TableTestBase {
}
@Test
- public void testRankOnSortAggWithGlobalShuffle() {
+ public void testRankOnTwoPhaseSortAggWithHashShuffle() {
util.tableEnv()
.getConfig()
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"SortAgg");
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"HashAgg");
+ util.tableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
+ util.verifyExecPlan(
+ "SELECT * FROM (\n"
+ + " SELECT a, b, RANK() OVER(PARTITION
BY a ORDER BY b) rk FROM (\n"
+ + " SELECT a, SUM(b) AS b FROM
T GROUP BY a\n"
+ + " )\n"
+ + " ) WHERE rk <= 10");
+ }
+
+ @Test
+ public void testRankOnOnePhaseSortAggWithGlobalShuffle() {
+ util.tableEnv()
+ .getConfig()
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"HashAgg");
+ util.tableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "ONE_PHASE");
+ util.verifyExecPlan(
+ "SELECT * FROM (\n"
+ + " SELECT b, RANK() OVER(ORDER BY b)
rk FROM (\n"
+ + " SELECT SUM(b) AS b FROM T\n"
+ + " )\n"
+ + " ) WHERE rk <= 10");
+ }
+
+ @Test
+ public void testRankOnTwoPhaseSortAggWithGlobalShuffle() {
+ util.tableEnv()
+ .getConfig()
+ .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS,
"HashAgg");
+ util.tableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
util.verifyExecPlan(
"SELECT * FROM (\n"
+ " SELECT b, RANK() OVER(ORDER BY b)
rk FROM (\n"
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
index 960e1165e48..898426eb39e 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
@@ -16,6 +16,38 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testAutoPhaseSortAggOnSortMergeJoinWithHashShuffle">
+ <Resource name="sql">
+ <![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
+SELECT sum(b1) FROM r group by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)])
+ +- LogicalProject(a1=[$0], b1=[$1])
+ +- LogicalFilter(condition=[AND(=($0, $4), LIKE($2, _UTF-16LE'He%'))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
T2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
+ +- Exchange(distribution=[forward])
+ +- Calc(select=[a1, b1])
+ +- Exchange(distribution=[forward])
+ +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 = a2)],
select=[a1, b1, a2])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]],
fields=[a1, b1, c1])
+ +- Exchange(distribution=[hash[a2]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testHashAggOnHashJoinWithHashShuffle">
<Resource name="sql">
<![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
@@ -206,6 +238,38 @@ MultipleInput(readOrder=[1,1,0,0],
members=[\nUnion(all=[true], union=[b, sd, sy
:- Exchange(distribution=[keep_input_as_is[hash[a]]])
: +- Reused(reference_id=[2])
+- Reused(reference_id=[3])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testOnePhaseSortAggOnSortMergeJoinWithHashShuffle">
+ <Resource name="sql">
+ <![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
+SELECT sum(b1) FROM r group by a1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)])
+ +- LogicalProject(a1=[$0], b1=[$1])
+ +- LogicalFilter(condition=[AND(=($0, $4), LIKE($2, _UTF-16LE'He%'))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
T1]])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
T2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[EXPR$0])
++- SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
+ +- Exchange(distribution=[forward])
+ +- Calc(select=[a1, b1])
+ +- Exchange(distribution=[forward])
+ +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 = a2)],
select=[a1, b1, a2])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]],
fields=[a1, b1, c1])
+ +- Exchange(distribution=[hash[a2]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
]]>
</Resource>
</TestCase>
@@ -229,7 +293,8 @@ OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0
RANG BETWEEN UNBOUNDED
+- Exchange(distribution=[forward])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[b], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+ +- LocalHashAggregate(select=[Partial_SUM(b) AS sum$0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[b], metadata=[]]], fields=[b])
]]>
</Resource>
</TestCase>
@@ -260,7 +325,8 @@ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT)
/ w0$o0) AS avg_b, w1
+- Exchange(distribution=[keep_input_as_is[hash[c]]])
+- HashAggregate(isMerge=[true], groupBy=[c], select=[c,
Final_SUM(sum$0) AS sum_b])
+- Exchange(distribution=[hash[c]])
- +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[c, b], metadata=[], aggregates=[grouping=[c],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[c, sum$0])
+ +- LocalHashAggregate(groupBy=[c], select=[c, Partial_SUM(b)
AS sum$0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[c, b], metadata=[]]], fields=[c, b])
]]>
</Resource>
</TestCase>
@@ -284,7 +350,8 @@ OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0
RANG BETWEEN UNBOUNDED
+- Exchange(distribution=[forward])
+- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[b], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+ +- LocalSortAggregate(select=[Partial_SUM(b) AS sum$0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[b], metadata=[]]], fields=[b])
]]>
</Resource>
</TestCase>
@@ -315,7 +382,10 @@ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT)
/ w0$o0) AS avg_b, w1
+- Exchange(distribution=[forward])
+- Sort(orderBy=[c ASC])
+- Exchange(distribution=[hash[c]])
- +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[c, b], metadata=[], aggregates=[grouping=[c],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[c, sum$0])
+ +- LocalSortAggregate(groupBy=[c], select=[c, Partial_SUM(b)
AS sum$0])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[c ASC])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[c, b], metadata=[]]], fields=[c, b])
]]>
</Resource>
</TestCase>
@@ -345,7 +415,8 @@ Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10],
partitionBy=[], order
+- Exchange(distribution=[forward])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[b], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+ +- LocalHashAggregate(select=[Partial_SUM(b) AS sum$0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[b], metadata=[]]], fields=[b])
]]>
</Resource>
</TestCase>
@@ -375,11 +446,12 @@ Rank(rankType=[RANK], rankRange=[rankStart=1,
rankEnd=10], partitionBy=[a], orde
+- Exchange(distribution=[keep_input_as_is[hash[a]]])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_SUM(sum$0) AS b])
+- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[a, b], metadata=[], aggregates=[grouping=[a],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])
+ +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS
sum$0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
]]>
</Resource>
</TestCase>
- <TestCase name="testRankOnSortAggWithGlobalShuffle">
+ <TestCase name="testRankOnOnePhaseSortAggWithGlobalShuffle">
<Resource name="sql">
<![CDATA[SELECT * FROM (
SELECT b, RANK() OVER(ORDER BY b) rk FROM (
@@ -403,13 +475,13 @@ Rank(rankType=[RANK], rankRange=[rankStart=1,
rankEnd=10], partitionBy=[], order
+- Exchange(distribution=[forward])
+- Sort(orderBy=[b ASC])
+- Exchange(distribution=[forward])
- +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
+ +- SortAggregate(isMerge=[false], select=[SUM(b) AS b])
+- Exchange(distribution=[single])
- +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[b], metadata=[], aggregates=[grouping=[],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[sum$0])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[b], metadata=[]]], fields=[b])
]]>
</Resource>
</TestCase>
- <TestCase name="testRankOnSortAggWithHashShuffle">
+ <TestCase name="testRankOnOnePhaseSortAggWithHashShuffle">
<Resource name="sql">
<![CDATA[SELECT * FROM (
SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
@@ -433,52 +505,102 @@ Rank(rankType=[RANK], rankRange=[rankStart=1,
rankEnd=10], partitionBy=[a], orde
+- Exchange(distribution=[forward])
+- Sort(orderBy=[a ASC, b ASC])
+- Exchange(distribution=[keep_input_as_is[hash[a]]])
- +- HashAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_SUM(sum$0) AS b])
- +- Exchange(distribution=[hash[a]])
- +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[a, b], metadata=[], aggregates=[grouping=[a],
aggFunctions=[LongSumAggFunction(b)]]]], fields=[a, sum$0])
+ +- SortAggregate(isMerge=[false], groupBy=[a], select=[a, SUM(b) AS
b])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC])
+ +- Exchange(distribution=[hash[a]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
]]>
</Resource>
</TestCase>
- <TestCase name="testSortJoinWithMultipleInputDisabled">
+ <TestCase name="testRankOnTwoPhaseSortAggWithGlobalShuffle">
<Resource name="sql">
- <![CDATA[SELECT * FROM
- (SELECT a FROM T1 JOIN T ON a = a1) t1
- INNER JOIN
- (SELECT d2 FROM T JOIN T2 ON d2 = a) t2
- ON t1.a = t2.d2]]>
+ <![CDATA[SELECT * FROM (
+ SELECT b, RANK() OVER(ORDER BY b) rk FROM (
+ SELECT SUM(b) AS b FROM T
+ )
+ ) WHERE rk <= 10]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(a=[$0], d2=[$1])
-+- LogicalJoin(condition=[=($0, $1)], joinType=[inner])
- :- LogicalProject(a=[$4])
- : +- LogicalJoin(condition=[=($4, $0)], joinType=[inner])
- : :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
- : +- LogicalTableScan(table=[[default_catalog, default_database, T]])
- +- LogicalProject(d2=[$7])
- +- LogicalJoin(condition=[=($7, $0)], joinType=[inner])
- :- LogicalTableScan(table=[[default_catalog, default_database, T]])
- +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+LogicalProject(b=[$0], rk=[$1])
++- LogicalFilter(condition=[<=($1, 10)])
+ +- LogicalProject(b=[$0], rk=[RANK() OVER (ORDER BY $0 NULLS FIRST)])
+ +- LogicalAggregate(group=[{}], b=[SUM($0)])
+ +- LogicalProject(b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-SortMergeJoin(joinType=[InnerJoin], where=[(a = d2)], select=[a, d2])
-:- Exchange(distribution=[keep_input_as_is[hash[a]]])
-: +- Calc(select=[a])
-: +- Exchange(distribution=[keep_input_as_is[hash[a1]]])
-: +- SortMergeJoin(joinType=[InnerJoin], where=[(a = a1)], select=[a1,
a])
-: :- Exchange(distribution=[hash[a1]])
-: : +- TableSourceScan(table=[[default_catalog, default_database,
T1, project=[a1], metadata=[]]], fields=[a1])
-: +- Exchange(distribution=[hash[a]])(reuse_id=[1])
-: +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[a], metadata=[]]], fields=[a])
-+- Exchange(distribution=[keep_input_as_is[hash[d2]]])
- +- Calc(select=[d2])
+Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[],
orderBy=[b ASC], global=[true], select=[b, w0$o0])
++- Exchange(distribution=[forward])
+ +- Sort(orderBy=[b ASC])
+ +- Exchange(distribution=[forward])
+ +- SortAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS b])
+ +- Exchange(distribution=[single])
+ +- LocalSortAggregate(select=[Partial_SUM(b) AS sum$0])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[b], metadata=[]]], fields=[b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRankOnTwoPhaseSortAggWithHashShuffle">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM (
+ SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM (
+ SELECT a, SUM(b) AS b FROM T GROUP BY a
+ )
+ ) WHERE rk <= 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], rk=[$2])
++- LogicalFilter(condition=[<=($2, 10)])
+ +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $0 ORDER BY
$1 NULLS FIRST)])
+ +- LogicalAggregate(group=[{0}], b=[SUM($1)])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a],
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
++- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC, b ASC])
+- Exchange(distribution=[keep_input_as_is[hash[a]]])
- +- SortMergeJoin(joinType=[InnerJoin], where=[(d2 = a)], select=[a,
d2])
- :- Reused(reference_id=[1])
- +- Exchange(distribution=[hash[d2]])
- +- TableSourceScan(table=[[default_catalog, default_database,
T2, project=[d2], metadata=[]]], fields=[d2])
+ +- SortAggregate(isMerge=[true], groupBy=[a], select=[a,
Final_SUM(sum$0) AS b])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalSortAggregate(groupBy=[a], select=[a,
Partial_SUM(b) AS sum$0])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRankWithHashShuffle">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY
b) rk FROM T) WHERE rk <= 10]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], rk=[$2])
++- LogicalFilter(condition=[<=($2, 10)])
+ +- LogicalProject(a=[$0], b=[$1], rk=[RANK() OVER (PARTITION BY $0 ORDER BY
$1 NULLS FIRST)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a],
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
++- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC, b ASC])
+ +- Exchange(distribution=[hash[a]])
+ +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10],
partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC, b ASC])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
]]>
</Resource>
</TestCase>
@@ -509,7 +631,31 @@ SortAggregate(isMerge=[false], select=[SUM(b1) AS EXPR$0])
]]>
</Resource>
</TestCase>
- <TestCase name="testSortAggOnSortMergeJoinWithHashShuffle">
+ <TestCase name="testSortAggregateWithHashShuffle">
+ <Resource name="sql">
+ <![CDATA[ SELECT a, SUM(b) AS b FROM T GROUP BY a]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0}], b=[SUM($1)])
++- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS b])
++- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC])
+ +- Exchange(distribution=[hash[a]])
+ +- LocalSortAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS
sum$0])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a ASC])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTwoPhaseSortAggOnSortMergeJoinWithHashShuffle">
<Resource name="sql">
<![CDATA[WITH r AS (SELECT * FROM T1, T2 WHERE a1 = a2 AND c1 LIKE 'He%')
SELECT sum(b1) FROM r group by a1]]>
@@ -528,16 +674,63 @@ LogicalProject(EXPR$0=[$1])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[EXPR$0])
-+- SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, SUM(b1) AS EXPR$0])
++- SortAggregate(isMerge=[true], groupBy=[a1], select=[a1, Final_SUM(sum$0) AS
EXPR$0])
+- Exchange(distribution=[forward])
- +- Calc(select=[a1, b1])
- +- Exchange(distribution=[forward])
- +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 = a2)],
select=[a1, b1, a2])
- :- Exchange(distribution=[hash[a1]])
- : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
- : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]],
fields=[a1, b1, c1])
- +- Exchange(distribution=[hash[a2]])
- +- TableSourceScan(table=[[default_catalog,
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
+ +- Sort(orderBy=[a1 ASC])
+ +- Exchange(distribution=[hash[a1]])
+ +- LocalSortAggregate(groupBy=[a1], select=[a1, Partial_SUM(b1) AS
sum$0])
+ +- Exchange(distribution=[forward])
+ +- Sort(orderBy=[a1 ASC])
+ +- Calc(select=[a1, b1])
+ +- SortMergeJoin(joinType=[InnerJoin], where=[(a1 =
a2)], select=[a1, b1, a2])
+ :- Exchange(distribution=[hash[a1]])
+ : +- Calc(select=[a1, b1], where=[LIKE(c1, 'He%')])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, T1, filter=[], project=[a1, b1, c1], metadata=[]]],
fields=[a1, b1, c1])
+ +- Exchange(distribution=[hash[a2]])
+ +- TableSourceScan(table=[[default_catalog,
default_database, T2, project=[a2], metadata=[]]], fields=[a2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSortJoinWithMultipleInputDisabled">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM
+ (SELECT a FROM T1 JOIN T ON a = a1) t1
+ INNER JOIN
+ (SELECT d2 FROM T JOIN T2 ON d2 = a) t2
+ ON t1.a = t2.d2]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], d2=[$1])
++- LogicalJoin(condition=[=($0, $1)], joinType=[inner])
+ :- LogicalProject(a=[$4])
+ : +- LogicalJoin(condition=[=($4, $0)], joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+ +- LogicalProject(d2=[$7])
+ +- LogicalJoin(condition=[=($7, $0)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, T]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+SortMergeJoin(joinType=[InnerJoin], where=[(a = d2)], select=[a, d2])
+:- Exchange(distribution=[keep_input_as_is[hash[a]]])
+: +- Calc(select=[a])
+: +- Exchange(distribution=[keep_input_as_is[hash[a1]]])
+: +- SortMergeJoin(joinType=[InnerJoin], where=[(a = a1)], select=[a1,
a])
+: :- Exchange(distribution=[hash[a1]])
+: : +- TableSourceScan(table=[[default_catalog, default_database,
T1, project=[a1], metadata=[]]], fields=[a1])
+: +- Exchange(distribution=[hash[a]])(reuse_id=[1])
+: +- TableSourceScan(table=[[default_catalog, default_database,
T, project=[a], metadata=[]]], fields=[a])
++- Exchange(distribution=[keep_input_as_is[hash[d2]]])
+ +- Calc(select=[d2])
+ +- Exchange(distribution=[keep_input_as_is[hash[a]]])
+ +- SortMergeJoin(joinType=[InnerJoin], where=[(d2 = a)], select=[a,
d2])
+ :- Reused(reference_id=[1])
+ +- Exchange(distribution=[hash[d2]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
T2, project=[d2], metadata=[]]], fields=[d2])
]]>
</Resource>
</TestCase>