This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e44db6b30be7ceb97468b7c226490be1b9ff088
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon Sep 29 15:45:56 2025 +0200

    [FLINK-38211][table] Move testMultiSinkOnMultiJoinedView to MJ tests
---
 .../NonDeterministicUpdateAnalyzerTest.java        | 82 ------------------
 .../planner/plan/stream/sql/MultiJoinTest.java     | 96 ++++++++++++++++++++-
 .../analyze/NonDeterministicUpdateAnalyzerTest.xml | 35 --------
 .../planner/plan/stream/sql/MultiJoinTest.xml      | 99 +++++++++++++++-------
 4 files changed, 161 insertions(+), 151 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
index 99cc20b60da..3b085ddbc1d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.java
@@ -35,7 +35,6 @@ import org.junit.jupiter.api.Test;
 
 import scala.Enumeration;
 
-import static 
org.apache.flink.table.api.config.OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static scala.runtime.BoxedUnit.UNIT;
@@ -346,87 +345,6 @@ class NonDeterministicUpdateAnalyzerTest extends 
TableTestBase {
                 false);
     }
 
-    @Test
-    void testMultiSinkOnMultiJoinedView() {
-        tEnv.getConfig().set(TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true);
-        tEnv.executeSql(
-                "create temporary table src1 (\n"
-                        + "  a int,\n"
-                        + "  b bigint,\n"
-                        + "  c string,\n"
-                        + "  d int,\n"
-                        + "  primary key(a, c) not enforced\n"
-                        + ") with (\n"
-                        + " 'connector' = 'values',\n"
-                        + " 'changelog-mode' = 'I,UA,UB,D'\n"
-                        + ")");
-        tEnv.executeSql(
-                "create temporary table src2 (\n"
-                        + "  a int,\n"
-                        + "  b bigint,\n"
-                        + "  c string,\n"
-                        + "  d int,\n"
-                        + "  primary key(a, c) not enforced\n"
-                        + ") with (\n"
-                        + " 'connector' = 'values',\n"
-                        + " 'changelog-mode' = 'I,UA,UB,D'\n"
-                        + ")");
-        tEnv.executeSql(
-                "create temporary table sink1 (\n"
-                        + "  a int,\n"
-                        + "  b string,\n"
-                        + "  c bigint,\n"
-                        + "  d bigint\n"
-                        + ") with (\n"
-                        + " 'connector' = 'values',\n"
-                        + " 'sink-insert-only' = 'false'\n"
-                        + ")");
-        tEnv.executeSql(
-                "create temporary table sink2 (\n"
-                        + "  a int,\n"
-                        + "  b string,\n"
-                        + "  c bigint,\n"
-                        + "  d string\n"
-                        + ") with (\n"
-                        + " 'connector' = 'values',\n"
-                        + " 'sink-insert-only' = 'false'\n"
-                        + ")");
-        tEnv.executeSql(
-                "create temporary view v1 as\n"
-                        + "select\n"
-                        + "  t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as 
c\n"
-                        + "from (\n"
-                        + "  select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 
'yyMMdd') as `day`\n"
-                        + "  from src1\n"
-                        + " ) t1\n"
-                        + "join (\n"
-                        + "  select b, CONCAT(c, 
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n"
-                        + "  from src2\n"
-                        + ") t2\n"
-                        + " on t1.a = t2.d");
-
-        StatementSet stmtSet = tEnv.createStatementSet();
-        stmtSet.addInsertSql(
-                "insert into sink1\n"
-                        + "  select a, `day`, sum(b), count(distinct c)\n"
-                        + "  from v1\n"
-                        + "  group by a, `day`");
-        stmtSet.addInsertSql(
-                "insert into sink2\n"
-                        + "  select a, `day`, b, c\n"
-                        + "  from v1\n"
-                        + "  where b > 100");
-
-        util.doVerifyPlan(
-                stmtSet,
-                new ExplainDetail[] {ExplainDetail.PLAN_ADVICE},
-                false,
-                new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()},
-                () -> UNIT,
-                false,
-                false);
-    }
-
     @Test
     void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() {
         // from 
NonDeterministicDagTest#testCdcJoinDimWithPkOutputNoPkSinkWithoutPk
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
index ff826fab581..ece24f9d95c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
@@ -18,18 +18,25 @@
 
 package org.apache.flink.table.planner.plan.stream.sql;
 
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.utils.PlanKind;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
-import org.apache.flink.table.planner.utils.TableTestUtil;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import scala.Enumeration;
+
+import static scala.runtime.BoxedUnit.UNIT;
+
 /** Tests for multi-join plans. */
 public class MultiJoinTest extends TableTestBase {
 
-    private TableTestUtil util;
+    private StreamTableTestUtil util;
 
     @BeforeEach
     void setup() {
@@ -553,4 +560,89 @@ public class MultiJoinTest extends TableTestBase {
                         + "JOIN AddressPK a"
                         + "  ON  u.user_id = a.user_id AND a.location IS NOT 
NULL");
     }
+
+    @Test
+    void testMultiSinkOnMultiJoinedView() {
+        util.tableEnv()
+                .executeSql(
+                        "create temporary table src1 (\n"
+                                + "  a int,\n"
+                                + "  b bigint,\n"
+                                + "  c string,\n"
+                                + "  d int,\n"
+                                + "  primary key(a, c) not enforced\n"
+                                + ") with (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'changelog-mode' = 'I,UA,UB,D'\n"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "create temporary table src2 (\n"
+                                + "  a int,\n"
+                                + "  b bigint,\n"
+                                + "  c string,\n"
+                                + "  d int,\n"
+                                + "  primary key(a, c) not enforced\n"
+                                + ") with (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'changelog-mode' = 'I,UA,UB,D'\n"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "create temporary table sink1 (\n"
+                                + "  a int,\n"
+                                + "  b string,\n"
+                                + "  c bigint,\n"
+                                + "  d bigint\n"
+                                + ") with (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'sink-insert-only' = 'false'\n"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "create temporary table sink2 (\n"
+                                + "  a int,\n"
+                                + "  b string,\n"
+                                + "  c bigint,\n"
+                                + "  d string\n"
+                                + ") with (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'sink-insert-only' = 'false'\n"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "create temporary view v1 as\n"
+                                + "select\n"
+                                + "  t1.a as a, t1.`day` as `day`, t2.b as b, 
t2.c as c\n"
+                                + "from (\n"
+                                + "  select a, b, 
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n"
+                                + "  from src1\n"
+                                + " ) t1\n"
+                                + "join (\n"
+                                + "  select b, CONCAT(c, 
DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n"
+                                + "  from src2\n"
+                                + ") t2\n"
+                                + " on t1.a = t2.d");
+
+        StatementSet stmtSet = util.tableEnv().createStatementSet();
+        stmtSet.addInsertSql(
+                "insert into sink1\n"
+                        + "  select a, `day`, sum(b), count(distinct c)\n"
+                        + "  from v1\n"
+                        + "  group by a, `day`");
+        stmtSet.addInsertSql(
+                "insert into sink2\n"
+                        + "  select a, `day`, b, c\n"
+                        + "  from v1\n"
+                        + "  where b > 100");
+
+        util.doVerifyPlan(
+                stmtSet,
+                new ExplainDetail[] {ExplainDetail.PLAN_ADVICE},
+                false,
+                new Enumeration.Value[] {PlanKind.OPT_REL_WITH_ADVICE()},
+                () -> UNIT,
+                false,
+                false);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
index 63918b77014..dc45a0b5c9d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
@@ -115,41 +115,6 @@ source node:
 TableSourceScan(table=[[default_catalog, default_database, 
cdc_with_meta_rename, project=[a, b, c], metadata=[metadata_3]]], fields=[a, b, 
c, metadata_3], changelogMode=[I,UB,UA,D], upsertKeys=[[a]])
 
 
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testMultiSinkOnMultiJoinedView">
-    <Resource name="optimized rel plan with advice">
-      <![CDATA[
-Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, 
EXPR$3])
-+- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) 
AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3])
-   +- Exchange(distribution=[hash[a, day]])
-      +- Calc(select=[a, day, b0 AS b, c])
-         +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], 
joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
-            :- Exchange(distribution=[hash[a]])
-            :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') 
AS day])
-            :     +- TableSourceScan(table=[[default_catalog, 
default_database, src1, project=[a], metadata=[]]], fields=[a])
-            +- Exchange(distribution=[hash[d]])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
-
-Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c])
-+- Calc(select=[a, day, b0 AS b, c])
-   +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], 
joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
-      :- Exchange(distribution=[hash[a]])
-      :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
-      :     +- TableSourceScan(table=[[default_catalog, default_database, 
src1, project=[a], metadata=[]]], fields=[a])
-      +- Exchange(distribution=[hash[d]])
-         +- Calc(select=[b, c, d], where=[>(b, 100)])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
-
-advice[1]: [ADVICE] You might want to enable local-global two-phase 
optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 
'table.exec.mini-batch.allow-latency' to a positive long value, 
'table.exec.mini-batch.size' to a positive long value).
-advice[2]: [WARNING] The column(s): day(generated by non-deterministic 
function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for 
correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' 
only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) 
or current node outputs non-deterministic update messages. Please consider 
removing these non-deterministic columns or making them deterministic by using 
deterministic functions.
-
-related rel plan:
-Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], 
changelogMode=[I,UB,UA,D])
-+- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D])
-
-
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 6bf479fa4b6..e5dd4456502 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -35,7 +35,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND 
((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], 
joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND 
((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) o [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -63,7 +63,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), 
OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], 
joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), 
OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) o [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -77,7 +77,7 @@ Calc(select=[user_id_0, name, order_id, payment_id, location])
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND 
((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], 
joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND 
((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) o [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -110,7 +110,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), 
OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], 
joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), 
OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) o [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -143,10 +143,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(payment_id, 
user_id_3)], joinFilter=[true], 
select=[user_id_0,name,order_id,payment_id,location,user_id_3], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])
++- MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(payment_id, 
user_id_3)], select=[user_id_0,name,order_id,payment_id,location,user_id_3], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])
    :- Exchange(distribution=[hash[payment_id]])
    :  +- Calc(select=[user_id_0, name, order_id, payment_id])
-   :     +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, 
INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], 
joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
joinFilter=[=(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VA [...]
+   :     +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, 
INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], 
joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :        :- Exchange(distribution=[hash[user_id_0]])
    :        :  +- ChangelogNormalize(key=[user_id_0])
    :        :     +- Exchange(distribution=[hash[user_id_0]])
@@ -179,7 +179,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), =(user_id_1, user_id_2), 
=(user_id_2, user_id_3)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1,  [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), =(user_id_1, user_id_2), 
=(user_id_2, user_id_3)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,location,user_id_3],
 rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payme [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -248,6 +248,41 @@ Calc(select=[id, val, price])
    +- Exchange(distribution=[hash[id]])
       +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
5000:INTERVAL SECOND)])
          +- TableSourceScan(table=[[default_catalog, default_database, 
EventTable2]], fields=[id, price, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiSinkOnMultiJoinedView">
+    <Resource name="optimized rel plan with advice">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, 
EXPR$3])
++- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) 
AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3])
+   +- Exchange(distribution=[hash[a, day]])
+      +- Calc(select=[a, day, b0 AS b, c])
+         +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], 
select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, 
BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
+            :- Exchange(distribution=[hash[a]])
+            :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') 
AS day])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, src1, project=[a], metadata=[]]], fields=[a])
+            +- Exchange(distribution=[hash[d]])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
+
+Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c])
++- Calc(select=[a, day, b0 AS b, c])
+   +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], 
select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, 
BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
+      :- Exchange(distribution=[hash[a]])
+      :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
src1, project=[a], metadata=[]]], fields=[a])
+      +- Exchange(distribution=[hash[d]])
+         +- Calc(select=[b, c, d], where=[>(b, 100)])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
src2, project=[b, c, d], metadata=[]]], fields=[b, c, d])
+
+advice[1]: [ADVICE] You might want to enable local-global two-phase 
optimization by configuring ('table.exec.mini-batch.enabled' to 'true', 
'table.exec.mini-batch.allow-latency' to a positive long value, 
'table.exec.mini-batch.size' to a positive long value).
+advice[2]: [WARNING] The column(s): day(generated by non-deterministic 
function: CURRENT_TIMESTAMP ) can not satisfy the determinism requirement for 
correctly processing update message('UB'/'UA'/'D' in changelogMode, not 'I' 
only), this usually happens when input node has no upsertKey(upsertKeys=[{}]) 
or current node outputs non-deterministic update messages. Please consider 
removing these non-deterministic columns or making them deterministic by using 
deterministic functions.
+
+related rel plan:
+Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), _UTF-16LE'yyMMdd') AS day], 
changelogMode=[I,UB,UA,D])
++- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a], metadata=[]]], fields=[a], changelogMode=[I,UB,UA,D])
+
+
 ]]>
     </Resource>
   </TestCase>
@@ -272,7 +307,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_four_way], fields=[user
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_four_way], fields=[user_id, 
order_id, user_id0, payment_id, user_id1, name, location])
 +- Calc(select=[user_id, order_id, user_id0, payment_id, user_id1, name, 
location])
-   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER, 
INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id), 
(user_id)], joinConditions=[true, =(user_id, user_id0), =(user_id, user_id1), 
=(user_id, user_id2)], joinFilter=[AND(=(user_id, user_id2), =(user_id, 
user_id1), =(user_id, user_id0))], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1,user_id2,location], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647)  
[...]
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER, 
INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id), 
(user_id)], joinConditions=[true, =(user_id, user_id0), =(user_id, user_id1), 
=(user_id, user_id2)], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1,user_id2,location], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
paym [...]
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
UsersPK, project=[user_id, name], metadata=[]]], fields=[user_id, name])
       :- Exchange(distribution=[hash[user_id]])
@@ -311,7 +346,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_three_way], fields=[use
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_three_way], fields=[user_id, 
order_id, user_id0, payment_id, user_id1, description])
 +- Calc(select=[user_id0 AS user_id, order_id, user_id1 AS user_id0, 
payment_id, user_id AS user_id1, description])
-   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id)], 
joinConditions=[true, =(user_id0, user_id), =(user_id0, user_id1)], 
joinFilter=[AND(=(user_id0, user_id1), =(user_id0, user_id))], 
select=[user_id,description,order_id,user_id0,payment_id,user_id1], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) 
description, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0 [...]
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id)], 
joinConditions=[true, =(user_id0, user_id), =(user_id0, user_id1)], 
select=[user_id,description,order_id,user_id0,payment_id,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) 
description, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id1)])
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
UsersPK, project=[user_id, description], metadata=[]]], fields=[user_id, 
description])
       :- Exchange(distribution=[hash[user_id]])
@@ -338,7 +373,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, 
order_id, product, region_id])
 +- Calc(select=[user_id0 AS user_id, order_id, product, region_id])
-   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id, user_id)], joinConditions=[true, 
=(user_id, user_id0)], joinFilter=[=(user_id, user_id0)], 
select=[user_id,region_id,order_id,user_id0,product], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id, user_id)], joinConditions=[true, 
=(user_id, user_id0)], select=[user_id,region_id,order_id,user_id0,product], 
rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
UsersPK, project=[user_id, region_id], metadata=[]]], fields=[user_id, 
region_id])
       +- Exchange(distribution=[hash[user_id]])
@@ -363,7 +398,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, 
order_id, product, region_id], upsertMaterialize=[true])
 +- Calc(select=[user_id0 AS user_id, order_id, product, region_id])
-   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[true, =(user_id, 
user_id0)], joinFilter=[=(user_id, user_id0)], 
select=[user_id,region_id,order_id,user_id0,product], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[true, =(user_id, 
user_id0)], select=[user_id,region_id,order_id,user_id0,product], 
rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
       :- Exchange(distribution=[hash[user_id]])
       :  +- ChangelogNormalize(key=[user_id])
       :     +- Exchange(distribution=[hash[user_id]])
@@ -392,7 +427,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, 
order_id, product, region_id])
 +- Calc(select=[user_id, order_id, product, region_id])
-   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[(order_id, user_id), (user_id)], joinConditions=[true, 
=(user_id0, user_id)], joinFilter=[true], 
select=[order_id,user_id,product,user_id0,region_id], 
outputRowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) 
user_id, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER 
region_id)])
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[(order_id, user_id), (user_id)], joinConditions=[true, 
=(user_id0, user_id)], select=[order_id,user_id,product,user_id0,region_id], 
rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id, 
VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER region_id)])
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
OrdersPK]], fields=[order_id, user_id, product])
       +- Exchange(distribution=[hash[user_id]])
@@ -423,7 +458,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
age=[$7])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, age])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id)], joinFilter=[AND(=(user_id_0, 
user_id), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,user_id,age], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) user_i [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id)], 
select=[user_id_0,name,order_id,user_id_1,user_id,age], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) user_id, INTEGER age)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -506,7 +541,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = 
user_id_2) AND (user_id_0 = user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -528,7 +563,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -538,7 +573,7 @@ Calc(select=[user_id_0, name, order_id, payment_id])
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = 
user_id_2) AND (user_id_0 = user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -565,10 +600,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[cash], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, =(cash, 
price)], joinFilter=[=(cash, price)], 
select=[user_id_0,name,cash,order_id,payment_id,price], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) 
payment_id, INTEGER price)])
++- MultiJoin(commonJoinKey=[cash], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, =(cash, 
price)], select=[user_id_0,name,cash,order_id,payment_id,price], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
INTEGER price)])
    :- Exchange(distribution=[hash[cash]])
    :  +- Calc(select=[user_id_0, name, cash, order_id])
-   :     +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, =(user_id_0, 
user_id_1)], joinFilter=[=(user_id_0, user_id_1)], 
select=[user_id_0,name,cash,order_id,user_id_1], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) 
user_id_1)])
+   :     +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, =(user_id_0, 
user_id_1)], select=[user_id_0,name,cash,order_id,user_id_1], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1)])
    :        :- Exchange(distribution=[hash[user_id_0]])
    :        :  +- ChangelogNormalize(key=[user_id_0])
    :        :     +- Exchange(distribution=[hash[user_id_0]])
@@ -597,7 +632,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -619,7 +654,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -629,7 +664,7 @@ Calc(select=[user_id_0, name, order_id, payment_id])
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -656,7 +691,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)], 
stateTtlHints=[[[STAT [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -683,7 +718,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)], 
stateTtlHints=[[[STAT [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -709,9 +744,9 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, true], 
joinFilter=[true], select=[user_id_0,name,order_id,payment_id], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)])
+MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, true], 
select=[user_id_0,name,order_id,payment_id], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)])
 :- Exchange(distribution=[single])
-:  +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, true], 
joinFilter=[true], select=[user_id_0,name,order_id], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id)])
+:  +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, true], 
select=[user_id_0,name,order_id], rowType=[RecordType(VARCHAR(2147483647) 
user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id)])
 :     :- Exchange(distribution=[single])
 :     :  +- ChangelogNormalize(key=[user_id_0])
 :     :     +- Exchange(distribution=[hash[user_id_0]])
@@ -742,7 +777,7 @@ LogicalProject(name=[$1], proctime=[$2], rowtime=[$5], 
price=[$7])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[name, proctime, rowtime, price])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), noUniqueKey, noUniqueKey], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,proctime,user_id_1,rowtime,price,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) proctime, VARCHAR(2147483647) user_i 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), noUniqueKey, noUniqueKey], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,proctime,user_id_1,rowtime,price,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) proctime, VARCHAR(2147483647) user_id_1, 
TIMESTAMP(3) rowtime, INTEGER price, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0, name, PROCTIME_MATERIALIZE(PROCTIME()) AS 
proctime])
    :     +- TableSourceScan(table=[[default_catalog, default_database, 
UsersWithProctime]], fields=[user_id_0, name])
@@ -773,7 +808,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647)) AS payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[=(name, 'Gus')])
@@ -804,7 +839,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -831,7 +866,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -854,7 +889,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[=(name, 
_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
@@ -868,7 +903,7 @@ Calc(select=[user_id_0, 
CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "U
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647)) AS payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')])
@@ -900,7 +935,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647)) AS payment_id])
-+- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')])

Reply via email to