This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e44db6b30be7ceb97468b7c226490be1b9ff088 Author: Gustavo de Morais <[email protected]> AuthorDate: Mon Sep 29 15:45:56 2025 +0200 [FLINK-38211][table] Move testMultiSinkOnMultiJoinedView to MJ tests --- .../NonDeterministicUpdateAnalyzerTest.java | 82 ------------------ .../planner/plan/stream/sql/MultiJoinTest.java | 96 ++++++++++++++++++++- .../analyze/NonDeterministicUpdateAnalyzerTest.xml | 35 -------- .../planner/plan/stream/sql/MultiJoinTest.xml | 99 +++++++++++++++------- 4 files changed, 161 insertions(+), 151 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java index 99cc20b60da..3b085ddbc1d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java @@ -35,7 +35,6 @@ import org.junit.jupiter.api.Test; import scala.Enumeration; -import static org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static scala.runtime.BoxedUnit.UNIT; @@ -346,87 +345,6 @@ class NonDeterministicUpdateAnalyzerTest extends TableTestBase { false); } - @Test - void testMultiSinkOnMultiJoinedView() { - tEnv.getConfig().set(TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true); - tEnv.executeSql( - "create temporary table src1 (\n" - + " a int,\n" - + " b bigint,\n" - + " c string,\n" - + " d int,\n" - + " primary key(a, c) not enforced\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,UB,D'\n" - + ")"); - tEnv.executeSql( - "create temporary table src2 (\n" - + " a int,\n" - + " b bigint,\n" - + " c string,\n" - + " d int,\n" - + " primary key(a, c) not enforced\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'changelog-mode' = 'I,UA,UB,D'\n" - + ")"); - tEnv.executeSql( - "create temporary table sink1 (\n" - + " a int,\n" - + " b string,\n" - + " c bigint,\n" - + " d bigint\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false'\n" - + ")"); - tEnv.executeSql( - "create temporary table sink2 (\n" - + " a int,\n" - + " b string,\n" - + " c bigint,\n" - + " d string\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'sink-insert-only' = 'false'\n" - + ")"); - tEnv.executeSql( - "create temporary view v1 as\n" - + "select\n" - + " t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n" - + "from (\n" - + " select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n" - + " from src1\n" - + " ) t1\n" - + "join (\n" - + " select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n" - + " from src2\n" - + ") t2\n" - + " on t1.a = t2.d"); - - StatementSet stmtSet = tEnv.createStatementSet(); - stmtSet.addInsertSql( - "insert into sink1\n" - + " select a, `day`, sum(b), count(distinct c)\n" - + " from v1\n" - + " group by a, `day`"); - stmtSet.addInsertSql( - "insert into sink2\n" - + " select a, `day`, b, c\n" - + " from v1\n" - + " where b > 100"); - - util.doVerifyPlan( - stmtSet, - new ExplainDetail[] {ExplainDetail.PLAN_ADVICE}, - false, - new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()}, - () -> UNIT, - false, - false); - } - @Test void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() { // from NonDeterministicDagTest#testCdcJoinDimWithPkOutputNoPkSinkWithoutPk diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java index ff826fab581..ece24f9d95c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java @@ -18,18 +18,25 @@ package org.apache.flink.table.planner.plan.stream.sql; +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.utils.PlanKind; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; import org.apache.flink.table.planner.utils.TableTestBase; -import org.apache.flink.table.planner.utils.TableTestUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import scala.Enumeration; + +import static scala.runtime.BoxedUnit.UNIT; + /** Tests for multi-join plans. */ public class MultiJoinTest extends TableTestBase { - private TableTestUtil util; + private StreamTableTestUtil util; @BeforeEach void setup() { @@ -553,4 +560,89 @@ public class MultiJoinTest extends TableTestBase { + "JOIN AddressPK a" + " ON u.user_id = a.user_id AND a.location IS NOT NULL"); } + + @Test + void testMultiSinkOnMultiJoinedView() { + util.tableEnv() + .executeSql( + "create temporary table src1 (\n" + + " a int,\n" + + " b bigint,\n" + + " c string,\n" + + " d int,\n" + + " primary key(a, c) not enforced\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'changelog-mode' = 'I,UA,UB,D'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary table src2 (\n" + + " a int,\n" + + " b bigint,\n" + + " c string,\n" + + " d int,\n" + + " primary key(a, c) not enforced\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'changelog-mode' = 'I,UA,UB,D'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary table sink1 (\n" + + " a int,\n" + + " b string,\n" + + " c bigint,\n" + + " d bigint\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary table sink2 (\n" + + " a int,\n" + + " b string,\n" + + " c bigint,\n" + + " d string\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'sink-insert-only' = 'false'\n" + + ")"); + util.tableEnv() + .executeSql( + "create temporary view v1 as\n" + + "select\n" + + " t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n" + + "from (\n" + + " select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n" + + " from src1\n" + + " ) t1\n" + + "join (\n" + + " select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n" + + " from src2\n" + + ") t2\n" + + " on t1.a = t2.d"); + + StatementSet stmtSet = util.tableEnv().createStatementSet(); + stmtSet.addInsertSql( + "insert into sink1\n" + + " select a, `day`, sum(b), count(distinct c)\n" + + " from v1\n" + + " group by a, `day`"); + stmtSet.addInsertSql( + "insert into sink2\n" + + " select a, `day`, b, c\n" + + " from v1\n" + + " where b > 100"); + + util.doVerifyPlan( + stmtSet, + new ExplainDetail[] {ExplainDetail.PLAN_ADVICE}, + false, + new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()}, + () -> UNIT, + false, + false); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml index 63918b77014..dc45a0b5c9d 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml @@ -115,41 +115,6 @@ source node: TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta_rename, project=[a, b, c], metadata=[metadata_3]]], fields=[a, b, c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]]) -]]> - </Resource> - </TestCase> - <TestCase name="testMultiSinkOnMultiJoinedView"> - <Resource name="optimized rel plan with advice"> - <![CDATA[ -Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, EXPR$3]) -+- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3]) - +- Exchange(distribution=[hash[a, day]]) - +- Calc(select=[a, day, b0 AS b, c]) - +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) - :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) - +- Exchange(distribution=[hash[d]]) - +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) - -Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c]) -+- Calc(select=[a, day, b0 AS b, c]) - +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) - :- Exchange(distribution=[hash[a]]) - : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) - : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) - +- Exchange(distribution=[hash[d]]) - +- Calc(select=[b, c, d], where=[>(b, 100)]) - +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) - -advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value). -advice[2]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions. - -related rel plan: -Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], changelogMode=[I,UB,UA,D]) -+- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D]) - - ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml index 6bf479fa4b6..e5dd4456502 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml @@ -35,7 +35,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) o [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -63,7 +63,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati == Optimized Physical Plan == Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) o [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -77,7 +77,7 @@ Calc(select=[user_id_0, name, order_id, payment_id, location]) == Optimized Execution Plan == Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) o [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -110,7 +110,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) o [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -143,10 +143,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(payment_id, user_id_3)], joinFilter=[true], select=[user_id_0,name,order_id,payment_id,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) ++- MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(payment_id, user_id_3)], select=[user_id_0,name,order_id,payment_id,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) :- Exchange(distribution=[hash[payment_id]]) : +- Calc(select=[user_id_0, name, order_id, payment_id]) - : +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VA [...] + : +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) : :- Exchange(distribution=[hash[user_id_0]]) : : +- ChangelogNormalize(key=[user_id_0]) : : +- Exchange(distribution=[hash[user_id_0]]) @@ -179,7 +179,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_1, user_id_2), =(user_id_2, user_id_3)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_1, user_id_2), =(user_id_2, user_id_3)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payme [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -248,6 +248,41 @@ Calc(select=[id, val, price]) +- Exchange(distribution=[hash[id]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 5000:INTERVAL SECOND)]) +- TableSourceScan(table=[[default_catalog, default_database, EventTable2]], fields=[id, price, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiSinkOnMultiJoinedView"> + <Resource name="optimized rel plan with advice"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, EXPR$3]) ++- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3]) + +- Exchange(distribution=[hash[a, day]]) + +- Calc(select=[a, day, b0 AS b, c]) + +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) + +- Exchange(distribution=[hash[d]]) + +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) + +Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c]) ++- Calc(select=[a, day, b0 AS b, c]) + +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) + : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) + +- Exchange(distribution=[hash[d]]) + +- Calc(select=[b, c, d], where=[>(b, 100)]) + +- TableSourceScan(table=[[default_catalog, default_database, src2, project=[b, c, d], metadata=[]]], fields=[b, c, d]) + +advice[1]: [ADVICE] You might want to enable local-global two-phase optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 'table.exec.mini-batch.allow-latency' to a positive long value, 'table.exec.mini-batch.size' to a positive long value). +advice[2]: [WARNING] The column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) or current node outputs non-deterministic update messages. Please consider removing these non-deterministic columns or making them deterministic by using deterministic functions. + +related rel plan: +Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], changelogMode=[I,UB,UA,D]) ++- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D]) + + ]]> </Resource> </TestCase> @@ -272,7 +307,7 @@ LogicalSink(table=[default_catalog.default_database.sink_four_way], fields=[user <![CDATA[ Sink(table=[default_catalog.default_database.sink_four_way], fields=[user_id, order_id, user_id0, payment_id, user_id1, name, location]) +- Calc(select=[user_id, order_id, user_id0, payment_id, user_id1, name, location]) - +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id), (user_id)], joinConditions=[true, =(user_id, user_id0), =(user_id, user_id1), =(user_id, user_id2)], joinFilter=[AND(=(user_id, user_id2), =(user_id, user_id1), =(user_id, user_id0))], select=[user_id,name,order_id,user_id0,payment_id,user_id1,user_id2,location], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) [...] + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id), (user_id)], joinConditions=[true, =(user_id, user_id0), =(user_id, user_id1), =(user_id, user_id2)], select=[user_id,name,order_id,user_id0,payment_id,user_id1,user_id2,location], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) paym [...] :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersPK, project=[user_id, name], metadata=[]]], fields=[user_id, name]) :- Exchange(distribution=[hash[user_id]]) @@ -311,7 +346,7 @@ LogicalSink(table=[default_catalog.default_database.sink_three_way], fields=[use <![CDATA[ Sink(table=[default_catalog.default_database.sink_three_way], fields=[user_id, order_id, user_id0, payment_id, user_id1, description]) +- Calc(select=[user_id0 AS user_id, order_id, user_id1 AS user_id0, payment_id, user_id AS user_id1, description]) - +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id)], joinConditions=[true, =(user_id0, user_id), =(user_id0, user_id1)], joinFilter=[AND(=(user_id0, user_id1), =(user_id0, user_id))], select=[user_id,description,order_id,user_id0,payment_id,user_id1], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) description, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0 [...] + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id)], joinConditions=[true, =(user_id0, user_id), =(user_id0, user_id1)], select=[user_id,description,order_id,user_id0,payment_id,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) description, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1)]) :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersPK, project=[user_id, description], metadata=[]]], fields=[user_id, description]) :- Exchange(distribution=[hash[user_id]]) @@ -338,7 +373,7 @@ LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_ <![CDATA[ Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, order_id, product, region_id]) +- Calc(select=[user_id0 AS user_id, order_id, product, region_id]) - +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id)], joinConditions=[true, =(user_id, user_id0)], joinFilter=[=(user_id, user_id0)], select=[user_id,region_id,order_id,user_id0,product], outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id)], joinConditions=[true, =(user_id, user_id0)], select=[user_id,region_id,order_id,user_id0,product], rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersPK, project=[user_id, region_id], metadata=[]]], fields=[user_id, region_id]) +- Exchange(distribution=[hash[user_id]]) @@ -363,7 +398,7 @@ LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_ <![CDATA[ Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, order_id, product, region_id], upsertMaterialize=[true]) +- Calc(select=[user_id0 AS user_id, order_id, product, region_id]) - +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (order_id)], joinConditions=[true, =(user_id, user_id0)], joinFilter=[=(user_id, user_id0)], select=[user_id,region_id,order_id,user_id0,product], outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (order_id)], joinConditions=[true, =(user_id, user_id0)], select=[user_id,region_id,order_id,user_id0,product], rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) :- Exchange(distribution=[hash[user_id]]) : +- ChangelogNormalize(key=[user_id]) : +- Exchange(distribution=[hash[user_id]]) @@ -392,7 +427,7 @@ LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_ <![CDATA[ Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, order_id, product, region_id]) +- Calc(select=[user_id, order_id, product, region_id]) - +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[(order_id, user_id), (user_id)], joinConditions=[true, =(user_id0, user_id)], joinFilter=[true], select=[order_id,user_id,product,user_id0,region_id], outputRowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER region_id)]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[(order_id, user_id), (user_id)], joinConditions=[true, =(user_id0, user_id)], select=[order_id,user_id,product,user_id0,region_id], rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER region_id)]) :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, OrdersPK]], fields=[order_id, user_id, product]) +- Exchange(distribution=[hash[user_id]]) @@ -423,7 +458,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], age=[$7]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, age]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id)], joinFilter=[AND(=(user_id_0, user_id), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,user_id,age], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_i [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id)], select=[user_id_0,name,order_id,user_id_1,user_id,age], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_id, INTEGER age)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -506,7 +541,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -528,7 +563,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) == Optimized Physical Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -538,7 +573,7 @@ Calc(select=[user_id_0, name, order_id, payment_id]) == Optimized Execution Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -565,10 +600,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[cash], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, =(cash, price)], joinFilter=[=(cash, price)], select=[user_id_0,name,cash,order_id,payment_id,price], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, INTEGER price)]) ++- MultiJoin(commonJoinKey=[cash], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, =(cash, price)], select=[user_id_0,name,cash,order_id,payment_id,price], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, INTEGER price)]) :- Exchange(distribution=[hash[cash]]) : +- Calc(select=[user_id_0, name, cash, order_id]) - : +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, =(user_id_0, user_id_1)], joinFilter=[=(user_id_0, user_id_1)], select=[user_id_0,name,cash,order_id,user_id_1], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1)]) + : +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, =(user_id_0, user_id_1)], select=[user_id_0,name,cash,order_id,user_id_1], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1)]) : :- Exchange(distribution=[hash[user_id_0]]) : : +- ChangelogNormalize(key=[user_id_0]) : : +- Exchange(distribution=[hash[user_id_0]]) @@ -597,7 +632,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -619,7 +654,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) == Optimized Physical Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -629,7 +664,7 @@ Calc(select=[user_id_0, name, order_id, payment_id]) == Optimized Execution Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -656,7 +691,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)], stateTtlHints=[[[STAT [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -683,7 +718,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)], stateTtlHints=[[[STAT [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -709,9 +744,9 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, true], joinFilter=[true], select=[user_id_0,name,order_id,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)]) +MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, true], select=[user_id_0,name,order_id,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)]) :- Exchange(distribution=[single]) -: +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, true], joinFilter=[true], select=[user_id_0,name,order_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id)]) +: +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, true], select=[user_id_0,name,order_id], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id)]) : :- Exchange(distribution=[single]) : : +- ChangelogNormalize(key=[user_id_0]) : : +- Exchange(distribution=[hash[user_id_0]]) @@ -742,7 +777,7 @@ LogicalProject(name=[$1], proctime=[$2], rowtime=[$5], price=[$7]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[name, proctime, rowtime, price]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), noUniqueKey, noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,proctime,user_id_1,rowtime,price,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) proctime, VARCHAR(2147483647) user_i [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), noUniqueKey, noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,proctime,user_id_1,rowtime,price,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) proctime, VARCHAR(2147483647) user_id_1, TIMESTAMP(3) rowtime, INTEGER price, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0, name, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersWithProctime]], fields=[user_id_0, name]) @@ -773,7 +808,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[=(name, 'Gus')]) @@ -804,7 +839,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -831,7 +866,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -854,7 +889,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) == Optimized Physical Plan == Calc(select=[user_id_0, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, order_id, CAST(payment_id AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[=(name, _UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) @@ -868,7 +903,7 @@ Calc(select=[user_id_0, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "U == Optimized Execution Plan == Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')]) @@ -900,7 +935,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) -+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')])
