This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70f0c5fac121dd79c71e76d41c403b024ac112c3
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu Sep 25 09:45:15 2025 +0200

    [FLINK-38211][table] Update test plans and restore tests
---
 .../exec/serde/ExecNodeMultiJoinJsonSerdeTest.java | 171 ---------------------
 .../analyze/NonDeterministicUpdateAnalyzerTest.xml |   4 +-
 .../planner/plan/stream/sql/MultiJoinTest.xml      |  64 ++++----
 ...our-way-complex-updating-join-with-restore.json |   2 +-
 ...r-way-join-no-common-join-key-with-restore.json |   4 +-
 .../plan/three-way-inner-join-with-restore.json    |   2 +-
 ...way-join-with-time-attributes-with-restore.json |   2 +-
 .../three-way-left-outer-join-with-restore.json    |   2 +-
 8 files changed, 40 insertions(+), 211 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java
deleted file mode 100644
index 14e2489a06d..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.serde;
-
-import org.apache.flink.FlinkVersion;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.FlinkTypeSystem;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultiJoin;
-import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
-import 
org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-class ExecNodeMultiJoinJsonSerdeTest {
-    private final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-    private final SerdeContext serdeContext = 
JsonSerdeTestUtil.configuredSerdeContext();
-
-    @Test
-    void testSerializingStreamExecMultiJoin() throws IOException {
-        // Create test data
-        final StreamExecMultiJoin execNode = createTestMultiJoinNode();
-        final ExecNodeGraph graph =
-                new ExecNodeGraph(FlinkVersion.v2_1, 
Collections.singletonList(execNode));
-
-        // Test if we can serialize
-        final String serializedGraph = JsonSerdeTestUtil.toJson(serdeContext, 
graph);
-        assertThat(serializedGraph).isNotEmpty();
-
-        // Test if we can deserialize
-        final ExecNodeGraph deserializedGraph =
-                JsonSerdeTestUtil.toObject(serdeContext, serializedGraph, 
ExecNodeGraph.class);
-
-        // Verify some general value checks on the deserialized node
-        assertThat(deserializedGraph.getRootNodes()).hasSize(1);
-        final StreamExecMultiJoin deserializedMultiJoinNode =
-                (StreamExecMultiJoin) deserializedGraph.getRootNodes().get(0);
-        
assertThat(deserializedMultiJoinNode.getDescription()).isEqualTo("test-multi-join");
-        assertThat(deserializedMultiJoinNode.getOutputType())
-                .isEqualTo(RowType.of(VarCharType.STRING_TYPE, new IntType()));
-    }
-
-    @Test
-    void testSerializedJsonStructure() throws IOException {
-        // Create test data
-        final StreamExecMultiJoin execNode = createTestMultiJoinNode();
-        final ExecNodeGraph graph =
-                new ExecNodeGraph(FlinkVersion.v2_1, 
Collections.singletonList(execNode));
-
-        // Serialize to JSON
-        final String json = JsonSerdeTestUtil.toJson(serdeContext, graph);
-        final JsonNode jsonNode = new ObjectMapper().readTree(json);
-
-        // Verify JSON structure using JsonSerdeTestUtil assertions
-        // Basic node structure
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"type");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "id");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"description");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"inputProperties");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"outputType");
-
-        // MultiJoin specific fields
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"joinTypes");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"joinAttributeMap");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"inputUniqueKeys");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"joinConditions");
-
-        // Verify specific field values
-        JsonNode node = jsonNode.get("nodes").get(0);
-        
assertThat(node.get("type").asText()).isEqualTo("stream-exec-multi-join_1");
-        
assertThat(node.get("description").asText()).isEqualTo("test-multi-join");
-        assertThat(node.get("joinTypes").isArray()).isTrue();
-        assertThat(node.get("joinTypes"))
-                .containsExactly(new TextNode("INNER"), new TextNode("INNER"));
-        assertThat(node.get("joinAttributeMap").isObject()).isTrue();
-        assertThat(node.get("inputUniqueKeys").isArray()).isTrue();
-        assertThat(node.get("inputUniqueKeys")).hasSize(2);
-        assertThat(node.get("joinConditions").isArray()).isTrue();
-        assertThat(node.get("joinConditions")).hasSize(2);
-        assertThat(node.get("inputProperties").isArray()).isTrue();
-        assertThat(node.get("inputProperties")).hasSize(2);
-    }
-
-    private StreamExecMultiJoin createTestMultiJoinNode() {
-        final FlinkTypeFactory typeFactory =
-                new FlinkTypeFactory(classLoader, FlinkTypeSystem.INSTANCE);
-        final RexBuilder builder = new RexBuilder(typeFactory);
-        final RelDataType varCharType =
-                typeFactory.createFieldTypeFromLogicalType(new VarCharType());
-
-        final RexNode condition =
-                builder.makeCall(
-                        SqlStdOperatorTable.EQUALS,
-                        new RexInputRef(0, varCharType),
-                        new RexInputRef(1, varCharType));
-
-        final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap = 
createJoinAttributeMap();
-
-        final var execNode =
-                new StreamExecMultiJoin(
-                        new Configuration(),
-                        Arrays.asList(FlinkJoinType.INNER, 
FlinkJoinType.INNER),
-                        Arrays.asList(null, condition),
-                        null,
-                        joinAttributeMap,
-                        List.of(
-                                List.of(new int[] {0}),
-                                List.of(new int[] {0})), // left keys for each 
join
-                        Collections.emptyMap(),
-                        Arrays.asList(InputProperty.DEFAULT, 
InputProperty.DEFAULT),
-                        RowType.of(VarCharType.STRING_TYPE, new IntType()),
-                        "test-multi-join");
-
-        execNode.setInputEdges(Collections.emptyList());
-        return execNode;
-    }
-
-    private static Map<Integer, List<ConditionAttributeRef>> 
createJoinAttributeMap() {
-        final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap = new 
HashMap<>();
-
-        // Corresponds to a join between input 0 and 1 on their first fields.
-        final List<ConditionAttributeRef> attributesForJoinWithInput1 =
-                List.of(new ConditionAttributeRef(0, 0, 1, 0));
-        joinAttributeMap.put(1, attributesForJoinWithInput1); // Key is the 
right-side input index.
-
-        // Corresponds to a join between input 0 and 2 on their first fields.
-        final List<ConditionAttributeRef> attributesForJoinWithInput2 =
-                List.of(new ConditionAttributeRef(0, 0, 2, 0));
-        joinAttributeMap.put(2, attributesForJoinWithInput2); // Key is the 
right-side input index.
-        return joinAttributeMap;
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
index ff9aba197d0..63918b77014 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml
@@ -125,7 +125,7 @@ Sink(table=[default_catalog.default_database.sink1], 
fields=[a, day, EXPR$2, EXP
 +- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) 
AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3])
    +- Exchange(distribution=[hash[a, day]])
       +- Calc(select=[a, day, b0 AS b, c])
-         +- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;],
 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:2;]}], 
select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, 
BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
+         +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], 
joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
             :- Exchange(distribution=[hash[a]])
             :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') 
AS day])
             :     +- TableSourceScan(table=[[default_catalog, 
default_database, src1, project=[a], metadata=[]]], fields=[a])
@@ -134,7 +134,7 @@ Sink(table=[default_catalog.default_database.sink1], 
fields=[a, day, EXPR$2, EXP
 
 Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c])
 +- Calc(select=[a, day, b0 AS b, c])
-   +- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;],
 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:2;]}], 
select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, 
BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
+   +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], 
joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)])
       :- Exchange(distribution=[hash[a]])
       :  +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day])
       :     +- TableSourceScan(table=[[default_catalog, default_database, 
src1, project=[a], metadata=[]]], fields=[a])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 8196aa986d2..6bf479fa4b6 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -35,7 +35,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], 
joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), 
AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND 
((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], 
joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -63,7 +63,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], 
joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), 
AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), 
OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], 
joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -77,7 +77,7 @@ Calc(select=[user_id_0, name, order_id, payment_id, location])
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], 
joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), 
AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND 
((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], 
joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -110,7 +110,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], 
joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), 
AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), 
OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], 
joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], 
select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2 [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -143,10 +143,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], 
joinConditions=[[true, =($3, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:3;],
 1=[LeftInputId:0;LeftFieldIndex:3;RightInputId:1;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,payment_id,location,user_id_3], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) location, V [...]
++- MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(payment_id, 
user_id_3)], joinFilter=[true], 
select=[user_id_0,name,order_id,payment_id,location,user_id_3], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])
    :- Exchange(distribution=[hash[payment_id]])
    :  +- Calc(select=[user_id_0, name, order_id, payment_id])
-   :     +- MultiJoin(joinFilter=[=($0, $5)], joinTypes=[[INNER, LEFT, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], [...]
+   :     +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, 
INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], 
joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], 
joinFilter=[=(user_id_0, user_id_2)], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VA [...]
    :        :- Exchange(distribution=[hash[user_id_0]])
    :        :  +- ChangelogNormalize(key=[user_id_0])
    :        :     +- Exchange(distribution=[hash[user_id_0]])
@@ -179,7 +179,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], locati
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id, location])
-+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT, LEFT]], 
joinConditions=[[true, =($0, $3), =($3, $5), =($5, $7)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;],
 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:1;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:1;], 
3=[LeftInputId:2;LeftFieldIndex:1;RightInputId:3;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], 
joinConditions=[true, =(user_id_0, user_id_1), =(user_id_1, user_id_2), 
=(user_id_2, user_id_3)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,location,user_id_3],
 outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1,  [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- ChangelogNormalize(key=[user_id_0])
    :     +- Exchange(distribution=[hash[user_id_0]])
@@ -272,7 +272,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_four_way], fields=[user
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_four_way], fields=[user_id, 
order_id, user_id0, payment_id, user_id1, name, location])
 +- Calc(select=[user_id, order_id, user_id0, payment_id, user_id1, name, 
location])
-   +- MultiJoin(joinFilter=[AND(=($0, $6), =($0, $5), =($0, $3))], 
joinTypes=[[INNER, INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), 
=($0, $5), =($0, $6)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, 
LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIn [...]
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER, 
INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id), 
(user_id)], joinConditions=[true, =(user_id, user_id0), =(user_id, user_id1), 
=(user_id, user_id2)], joinFilter=[AND(=(user_id, user_id2), =(user_id, 
user_id1), =(user_id, user_id0))], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1,user_id2,location], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647)  
[...]
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
UsersPK, project=[user_id, name], metadata=[]]], fields=[user_id, name])
       :- Exchange(distribution=[hash[user_id]])
@@ -311,7 +311,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_three_way], fields=[use
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_three_way], fields=[user_id, 
order_id, user_id0, payment_id, user_id1, description])
 +- Calc(select=[user_id0 AS user_id, order_id, user_id1 AS user_id0, 
payment_id, user_id AS user_id1, description])
-   +- MultiJoin(joinFilter=[AND(=($3, $5), =($3, $0))], joinTypes=[[INNER, 
INNER, INNER]], joinConditions=[[true, =($3, $0), =($3, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;],
 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:1;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id,description,order_id,user_id0,payment_id,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARC [...]
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id)], 
joinConditions=[true, =(user_id0, user_id), =(user_id0, user_id1)], 
joinFilter=[AND(=(user_id0, user_id1), =(user_id0, user_id))], 
select=[user_id,description,order_id,user_id0,payment_id,user_id1], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) 
description, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0 [...]
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
UsersPK, project=[user_id, description], metadata=[]]], fields=[user_id, 
description])
       :- Exchange(distribution=[hash[user_id]])
@@ -338,7 +338,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, 
order_id, product, region_id])
 +- Calc(select=[user_id0 AS user_id, order_id, product, region_id])
-   +- MultiJoin(joinFilter=[=($0, $3)], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, =($0, $3)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;],
 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;]}], 
select=[user_id,region_id,order_id,user_id0,product], 
rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id, user_id)], joinConditions=[true, 
=(user_id, user_id0)], joinFilter=[=(user_id, user_id0)], 
select=[user_id,region_id,order_id,user_id0,product], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
UsersPK, project=[user_id, region_id], metadata=[]]], fields=[user_id, 
region_id])
       +- Exchange(distribution=[hash[user_id]])
@@ -363,7 +363,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, 
order_id, product, region_id], upsertMaterialize=[true])
 +- Calc(select=[user_id0 AS user_id, order_id, product, region_id])
-   +- MultiJoin(joinFilter=[=($0, $3)], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, =($0, $3)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;],
 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;]}], 
select=[user_id,region_id,order_id,user_id0,product], 
rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[true, =(user_id, 
user_id0)], joinFilter=[=(user_id, user_id0)], 
select=[user_id,region_id,order_id,user_id0,product], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
product)])
       :- Exchange(distribution=[hash[user_id]])
       :  +- ChangelogNormalize(key=[user_id])
       :     +- Exchange(distribution=[hash[user_id]])
@@ -392,7 +392,7 @@ 
LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_
       <![CDATA[
 Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, 
order_id, product, region_id])
 +- Calc(select=[user_id, order_id, product, region_id])
-   +- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], 
joinConditions=[[true, =($3, $1)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:1;],
 1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:0;]}], 
select=[order_id,user_id,product,user_id0,region_id], 
rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id, 
VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER region_id)])
+   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[(order_id, user_id), (user_id)], joinConditions=[true, 
=(user_id0, user_id)], joinFilter=[true], 
select=[order_id,user_id,product,user_id0,region_id], 
outputRowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) 
user_id, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER 
region_id)])
       :- Exchange(distribution=[hash[user_id]])
       :  +- TableSourceScan(table=[[default_catalog, default_database, 
OrdersPK]], fields=[order_id, user_id, product])
       +- Exchange(distribution=[hash[user_id]])
@@ -423,7 +423,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
age=[$7])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, age])
-+- MultiJoin(joinFilter=[AND(=($0, $4), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], 
select=[user_id_0,name,order_id,user_id_1,user_id,age],  [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id)], joinFilter=[AND(=(user_id_0, 
user_id), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,user_id,age], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) user_i [...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -506,7 +506,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,use [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = 
user_id_2) AND (user_id_0 = user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -528,7 +528,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,use [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -538,7 +538,7 @@ Calc(select=[user_id_0, name, order_id, payment_id])
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,use [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = 
user_id_2) AND (user_id_0 = user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -565,10 +565,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[=($2, $5)], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, =($2, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:2;],
 1=[LeftInputId:0;LeftFieldIndex:2;RightInputId:1;RightFieldIndex:1;]}], 
select=[user_id_0,name,cash,order_id,payment_id,price], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
INTEGER price)])
++- MultiJoin(commonJoinKey=[cash], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, =(cash, 
price)], joinFilter=[=(cash, price)], 
select=[user_id_0,name,cash,order_id,payment_id,price], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) 
payment_id, INTEGER price)])
    :- Exchange(distribution=[hash[cash]])
    :  +- Calc(select=[user_id_0, name, cash, order_id])
-   :     +- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;],
 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;]}], 
select=[user_id_0,name,cash,order_id,user_id_1], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1)])
+   :     +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, =(user_id_0, 
user_id_1)], joinFilter=[=(user_id_0, user_id_1)], 
select=[user_id_0,name,cash,order_id,user_id_1], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) 
user_id_1)])
    :        :- Exchange(distribution=[hash[user_id_0]])
    :        :  +- ChangelogNormalize(key=[user_id_0])
    :        :     +- Exchange(distribution=[hash[user_id_0]])
@@ -597,7 +597,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,use [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -619,7 +619,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], 
joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -629,7 +629,7 @@ Calc(select=[user_id_0, name, order_id, payment_id])
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], 
joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -656,7 +656,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,use [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -683,7 +683,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,use [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 
[...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -709,9 +709,9 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-MultiJoin(joinFilter=[true], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, true]], joinAttributeMap=[{}], 
select=[user_id_0,name,order_id,payment_id], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)])
+MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, INNER], 
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, true], 
joinFilter=[true], select=[user_id_0,name,order_id,payment_id], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)])
 :- Exchange(distribution=[single])
-:  +- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], 
joinConditions=[[true, true]], joinAttributeMap=[{}], 
select=[user_id_0,name,order_id], rowType=[RecordType(VARCHAR(2147483647) 
user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id)])
+:  +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, true], 
joinFilter=[true], select=[user_id_0,name,order_id], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id)])
 :     :- Exchange(distribution=[single])
 :     :  +- ChangelogNormalize(key=[user_id_0])
 :     :     +- Exchange(distribution=[hash[user_id_0]])
@@ -742,7 +742,7 @@ LogicalProject(name=[$1], proctime=[$2], rowtime=[$5], 
price=[$7])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[name, proctime, rowtime, price])
-+- MultiJoin(joinFilter=[AND(=($0, $6), =($0, $3))], joinTypes=[[INNER, INNER, 
INNER]], joinConditions=[[true, =($0, $3), =($0, $6)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,proctime,user_id_1,rowtime,price, [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], 
inputUniqueKeys=[(user_id_0), noUniqueKey, noUniqueKey], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, 
user_id_2), =(user_id_0, user_id_1))], 
select=[user_id_0,name,proctime,user_id_1,rowtime,price,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) proctime, VARCHAR(2147483647) user_i 
[...]
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0, name, PROCTIME_MATERIALIZE(PROCTIME()) AS 
proctime])
    :     +- TableSourceScan(table=[[default_catalog, default_database, 
UsersWithProctime]], fields=[user_id_0, name])
@@ -773,7 +773,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647)) AS payment_id])
-+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], 
joinConditions=[[true, =($0, $2), =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[=(name, 'Gus')])
@@ -804,7 +804,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], 
joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -831,7 +831,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id_0, name, order_id, payment_id])
-+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], 
joinConditions=[[true, =($0, $3), =($0, $5)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor 
[...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], 
select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- TableSourceScan(table=[[default_catalog, default_database, Users, 
project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name])
    :- Exchange(distribution=[hash[user_id_1]])
@@ -854,7 +854,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
 
 == Optimized Physical Plan ==
 Calc(select=[user_id_0, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS payment_id])
-+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], 
joinConditions=[[true, =($0, $2), =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
=(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[=(name, 
_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
@@ -868,7 +868,7 @@ Calc(select=[user_id_0, 
CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "U
 
 == Optimized Execution Plan ==
 Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647)) AS payment_id])
-+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], 
joinConditions=[[true, =($0, $2), =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')])
@@ -900,7 +900,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], 
payment_id=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, 
CAST(payment_id AS VARCHAR(2147483647)) AS payment_id])
-+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], 
joinConditions=[[true, =($0, $2), =($0, $4)]], 
joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;,
 LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 
1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 
2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...]
++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], 
inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, 
(user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = 
user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id_2)])
    :- Exchange(distribution=[hash[user_id_0]])
    :  +- Calc(select=[user_id_0])
    :     +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
index 1976c12bf92..f25678040a6 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
@@ -447,7 +447,7 @@
       "priority" : 3
     } ],
     "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` 
VARCHAR(2147483647) NOT NULL, `cash` INT, `order_id` VARCHAR(2147483647), 
`user_id_1` VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` 
VARCHAR(2147483647) NOT NULL, `price` INT, `location` VARCHAR(2147483647), 
`user_id_3` VARCHAR(2147483647)>",
-    "description" : "MultiJoin(joinFilter=[AND(=($1, $5), OR(>=($2, $7), <($7, 
0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($1, 
$4), AND(=($1, $5), OR(>=($2, $7), <($7, 0))), =($5, $9)]], 
joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:1;],
 2=[LeftInputId:0;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:0;], 
3=[LeftInputId:2;LeftFieldIndex:0;RightInputId:3;RightFieldIndex:1;]}], 
select=[name,user_id_0,cash,order_id,user_id_1 [...]
+    "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, 
LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), 
noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = 
user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], 
joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], 
select=[name,user_id_0,cash,order_id,user_id_1,user_id_2,payment_id,price,location,user_id_3],
 outputRowType=[Re [...]
   }, {
     "id" : 30,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
index 596942d5eea..e0ab1634be5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
@@ -312,7 +312,7 @@
       "priority" : 2
     } ],
     "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` 
VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `user_id_1` 
VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` 
VARCHAR(2147483647) NOT NULL>",
-    "description" : "MultiJoin(joinFilter=[=($1, $4)], joinTypes=[[INNER, 
LEFT, INNER]], joinConditions=[[true, =($1, $3), =($1, $4)]], 
joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:1;],
 2=[LeftInputId:0;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:0;]}], 
select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], 
rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) use [...]
+    "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, 
LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], 
joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
joinFilter=[(user_id_0 = user_id_2)], 
select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], 
outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) 
user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) user_ [...]
   }, {
     "id" : 41,
     "type" : "stream-exec-calc_1",
@@ -462,7 +462,7 @@
       "priority" : 1
     } ],
     "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` 
VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `payment_id` 
VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647), `user_id_3` 
VARCHAR(2147483647)>",
-    "description" : "MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], 
joinConditions=[[true, =($3, $5)]], 
joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:3;RightInputId:1;RightFieldIndex:1;]}],
 select=[name,user_id_0,order_id,payment_id,location,user_id_3], 
rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])"
+    "description" : "MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, 
LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, 
(payment_id = user_id_3)], joinFilter=[true], 
select=[name,user_id_0,order_id,payment_id,location,user_id_3], 
outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) 
user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])"
   }, {
     "id" : 46,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
index 8c6259fcd1e..b1cd1345b66 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
@@ -225,7 +225,7 @@
       "priority" : 2
     } ],
     "outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` 
VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` 
VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` 
VARCHAR(2147483647)>",
-    "description" : "MultiJoin(joinFilter=[AND(=($0, $4), =($0, $2))], 
joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $2), =($0, 
$4)]], 
joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;],
 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], 
select=[user_id,name,user_id0,order_id,user_id1,payment_id], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) user_id0, VARCHAR(2147 [...]
+    "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, 
INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], 
joinConditions=[true, (user_id = user_id0), (user_id = user_id1)], 
joinFilter=[((user_id = user_id1) AND (user_id = user_id0))], 
select=[user_id,name,user_id0,order_id,user_id1,payment_id], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) 
name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, 
VARCHAR(2147483 [...]
   }, {
     "id" : 8,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
index fd59b796425..a7dd1890952 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
@@ -357,7 +357,7 @@
       "priority" : 2
     } ],
     "outputType" : "ROW<`user_id_0` VARCHAR(2147483647) NOT NULL, `name` 
VARCHAR(2147483647), `order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` 
VARCHAR(2147483647), `rowtime` TIMESTAMP(3), `price` INT, `user_id_2` 
VARCHAR(2147483647)>",
-    "description" : "MultiJoin(joinFilter=[AND(=($0, $6), =($0, $3))], 
joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, 
$6)]], 
joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;],
 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], 
select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VAR [...]
+    "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, 
INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], 
joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], 
joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], 
select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) 
name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) use [...]
   }, {
     "id" : 57,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
index 4c43ee089d5..258fca068dc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
@@ -225,7 +225,7 @@
       "priority" : 2
     } ],
     "outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` 
VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` 
VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` 
VARCHAR(2147483647)>",
-    "description" : "MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, 
LEFT]], joinConditions=[[true, =($0, $2), =($0, $4)]], 
joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;],
 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], 
select=[user_id,name,user_id0,order_id,user_id1,payment_id], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCH [...]
+    "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, 
LEFT, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], 
joinConditions=[true, (user_id = user_id0), (user_id = user_id1)], 
joinFilter=[true], select=[user_id,name,user_id0,order_id,user_id1,payment_id], 
outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) 
name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, 
VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])"
   }, {
     "id" : 17,
     "type" : "stream-exec-calc_1",

Reply via email to