This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 70f0c5fac121dd79c71e76d41c403b024ac112c3 Author: Gustavo de Morais <[email protected]> AuthorDate: Thu Sep 25 09:45:15 2025 +0200 [FLINK-38211][table] Update test plans and restore tests --- .../exec/serde/ExecNodeMultiJoinJsonSerdeTest.java | 171 --------------------- .../analyze/NonDeterministicUpdateAnalyzerTest.xml | 4 +- .../planner/plan/stream/sql/MultiJoinTest.xml | 64 ++++---- ...our-way-complex-updating-join-with-restore.json | 2 +- ...r-way-join-no-common-join-key-with-restore.json | 4 +- .../plan/three-way-inner-join-with-restore.json | 2 +- ...way-join-with-time-attributes-with-restore.json | 2 +- .../three-way-left-outer-join-with-restore.json | 2 +- 8 files changed, 40 insertions(+), 211 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java deleted file mode 100644 index 14e2489a06d..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.serde; - -import org.apache.flink.FlinkVersion; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.planner.calcite.FlinkTypeSystem; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMultiJoin; -import org.apache.flink.table.runtime.operators.join.FlinkJoinType; -import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor.ConditionAttributeRef; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.VarCharType; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; - -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; - -class ExecNodeMultiJoinJsonSerdeTest { - private final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - private final SerdeContext serdeContext = JsonSerdeTestUtil.configuredSerdeContext(); - - @Test - void testSerializingStreamExecMultiJoin() throws IOException { - // Create test data - final StreamExecMultiJoin execNode = createTestMultiJoinNode(); - final ExecNodeGraph graph = - new ExecNodeGraph(FlinkVersion.v2_1, Collections.singletonList(execNode)); - - // Test if we can serialize - final String serializedGraph = JsonSerdeTestUtil.toJson(serdeContext, graph); - assertThat(serializedGraph).isNotEmpty(); - - // Test if we can deserialize - final ExecNodeGraph deserializedGraph = - JsonSerdeTestUtil.toObject(serdeContext, serializedGraph, ExecNodeGraph.class); - - // Verify some general value checks on the deserialized node - assertThat(deserializedGraph.getRootNodes()).hasSize(1); - final StreamExecMultiJoin deserializedMultiJoinNode = - (StreamExecMultiJoin) deserializedGraph.getRootNodes().get(0); - assertThat(deserializedMultiJoinNode.getDescription()).isEqualTo("test-multi-join"); - assertThat(deserializedMultiJoinNode.getOutputType()) - .isEqualTo(RowType.of(VarCharType.STRING_TYPE, new IntType())); - } - - @Test - void testSerializedJsonStructure() throws IOException { - // Create test data - final StreamExecMultiJoin execNode = createTestMultiJoinNode(); - final ExecNodeGraph graph = - new ExecNodeGraph(FlinkVersion.v2_1, Collections.singletonList(execNode)); - - // Serialize to JSON - final String json = JsonSerdeTestUtil.toJson(serdeContext, graph); - final JsonNode jsonNode = new ObjectMapper().readTree(json); - - // Verify JSON structure using JsonSerdeTestUtil assertions - // Basic node structure - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "type"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "id"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "description"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "inputProperties"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "outputType"); - - // MultiJoin specific fields - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinTypes"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinAttributeMap"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "inputUniqueKeys"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinConditions"); - - // Verify specific field values - JsonNode node = jsonNode.get("nodes").get(0); - assertThat(node.get("type").asText()).isEqualTo("stream-exec-multi-join_1"); - assertThat(node.get("description").asText()).isEqualTo("test-multi-join"); - assertThat(node.get("joinTypes").isArray()).isTrue(); - assertThat(node.get("joinTypes")) - .containsExactly(new TextNode("INNER"), new TextNode("INNER")); - assertThat(node.get("joinAttributeMap").isObject()).isTrue(); - assertThat(node.get("inputUniqueKeys").isArray()).isTrue(); - assertThat(node.get("inputUniqueKeys")).hasSize(2); - assertThat(node.get("joinConditions").isArray()).isTrue(); - assertThat(node.get("joinConditions")).hasSize(2); - assertThat(node.get("inputProperties").isArray()).isTrue(); - assertThat(node.get("inputProperties")).hasSize(2); - } - - private StreamExecMultiJoin createTestMultiJoinNode() { - final FlinkTypeFactory typeFactory = - new FlinkTypeFactory(classLoader, FlinkTypeSystem.INSTANCE); - final RexBuilder builder = new RexBuilder(typeFactory); - final RelDataType varCharType = - typeFactory.createFieldTypeFromLogicalType(new VarCharType()); - - final RexNode condition = - builder.makeCall( - SqlStdOperatorTable.EQUALS, - new RexInputRef(0, varCharType), - new RexInputRef(1, varCharType)); - - final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap = createJoinAttributeMap(); - - final var execNode = - new StreamExecMultiJoin( - new Configuration(), - Arrays.asList(FlinkJoinType.INNER, FlinkJoinType.INNER), - Arrays.asList(null, condition), - null, - joinAttributeMap, - List.of( - List.of(new int[] {0}), - List.of(new int[] {0})), // left keys for each join - Collections.emptyMap(), - Arrays.asList(InputProperty.DEFAULT, InputProperty.DEFAULT), - RowType.of(VarCharType.STRING_TYPE, new IntType()), - "test-multi-join"); - - execNode.setInputEdges(Collections.emptyList()); - return execNode; - } - - private static Map<Integer, List<ConditionAttributeRef>> createJoinAttributeMap() { - final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap = new HashMap<>(); - - // Corresponds to a join between input 0 and 1 on their first fields. - final List<ConditionAttributeRef> attributesForJoinWithInput1 = - List.of(new ConditionAttributeRef(0, 0, 1, 0)); - joinAttributeMap.put(1, attributesForJoinWithInput1); // Key is the right-side input index. - - // Corresponds to a join between input 0 and 2 on their first fields. - final List<ConditionAttributeRef> attributesForJoinWithInput2 = - List.of(new ConditionAttributeRef(0, 0, 2, 0)); - joinAttributeMap.put(2, attributesForJoinWithInput2); // Key is the right-side input index. - return joinAttributeMap; - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml index ff9aba197d0..63918b77014 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/analyze/NonDeterministicUpdateAnalyzerTest.xml @@ -125,7 +125,7 @@ Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, EXP +- GroupAggregate(advice=[1], groupBy=[a, day], select=[a, day, SUM_RETRACT(b) AS EXPR$2, COUNT_RETRACT(DISTINCT c) AS EXPR$3]) +- Exchange(distribution=[hash[a, day]]) +- Calc(select=[a, day, b0 AS b, c]) - +- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, INNER]], joinConditions=[[true, =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:2;]}], select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) + +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) @@ -134,7 +134,7 @@ Sink(table=[default_catalog.default_database.sink1], fields=[a, day, EXPR$2, EXP Sink(table=[default_catalog.default_database.sink2], fields=[a, day, b, c]) +- Calc(select=[a, day, b0 AS b, c]) - +- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, INNER]], joinConditions=[[true, =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:2;]}], select=[a,day,b0,c,d], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) + +- MultiJoin(commonJoinKey=[a], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(a, d)], joinFilter=[=(a, d)], select=[a,day,b0,c,d], outputRowType=[RecordType(INTEGER a, VARCHAR(2147483647) day, BIGINT b0, VARCHAR(2147483647) c, INTEGER d)]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyMMdd') AS day]) : +- TableSourceScan(table=[[default_catalog, default_database, src1, project=[a], metadata=[]]], fields=[a]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml index 8196aa986d2..6bf479fa4b6 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml @@ -35,7 +35,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -63,7 +63,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati == Optimized Physical Plan == Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -77,7 +77,7 @@ Calc(select=[user_id_0, name, order_id, payment_id, location]) == Optimized Execution Plan == Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -110,7 +110,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[AND(=($0, $7), OR(>=($2, $6), <($6, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($0, $4), AND(=($0, $7), OR(>=($2, $6), <($6, 0))), =($7, $9)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:2 [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0))), =(user_id_2, user_id_3)], joinFilter=[AND(=(user_id_0, user_id_2), OR(>=(cash, price), <(price, 0)))], select=[user_id_0,name,cash,order_id,user_id_1,payment_id,price,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -143,10 +143,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], joinConditions=[[true, =($3, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:3;], 1=[LeftInputId:0;LeftFieldIndex:3;RightInputId:1;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,payment_id,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, V [...] ++- MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, =(payment_id, user_id_3)], joinFilter=[true], select=[user_id_0,name,order_id,payment_id,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)]) :- Exchange(distribution=[hash[payment_id]]) : +- Calc(select=[user_id_0, name, order_id, payment_id]) - : +- MultiJoin(joinFilter=[=($0, $5)], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], [...] + : +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, user_id_2)], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VA [...] : :- Exchange(distribution=[hash[user_id_0]]) : : +- ChangelogNormalize(key=[user_id_0]) : : +- Exchange(distribution=[hash[user_id_0]]) @@ -179,7 +179,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6], locati <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id, location]) -+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT, LEFT]], joinConditions=[[true, =($0, $3), =($3, $5), =($5, $7)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:1;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:1;], 3=[LeftInputId:2;LeftFieldIndex:1;RightInputId:3;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_1, user_id_2), =(user_id_2, user_id_3)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, [...] :- Exchange(distribution=[hash[user_id_0]]) : +- ChangelogNormalize(key=[user_id_0]) : +- Exchange(distribution=[hash[user_id_0]]) @@ -272,7 +272,7 @@ LogicalSink(table=[default_catalog.default_database.sink_four_way], fields=[user <![CDATA[ Sink(table=[default_catalog.default_database.sink_four_way], fields=[user_id, order_id, user_id0, payment_id, user_id1, name, location]) +- Calc(select=[user_id, order_id, user_id0, payment_id, user_id1, name, location]) - +- MultiJoin(joinFilter=[AND(=($0, $6), =($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5), =($0, $6)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIn [...] + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id), (user_id)], joinConditions=[true, =(user_id, user_id0), =(user_id, user_id1), =(user_id, user_id2)], joinFilter=[AND(=(user_id, user_id2), =(user_id, user_id1), =(user_id, user_id0))], select=[user_id,name,order_id,user_id0,payment_id,user_id1,user_id2,location], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) [...] :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersPK, project=[user_id, name], metadata=[]]], fields=[user_id, name]) :- Exchange(distribution=[hash[user_id]]) @@ -311,7 +311,7 @@ LogicalSink(table=[default_catalog.default_database.sink_three_way], fields=[use <![CDATA[ Sink(table=[default_catalog.default_database.sink_three_way], fields=[user_id, order_id, user_id0, payment_id, user_id1, description]) +- Calc(select=[user_id0 AS user_id, order_id, user_id1 AS user_id0, payment_id, user_id AS user_id1, description]) - +- MultiJoin(joinFilter=[AND(=($3, $5), =($3, $0))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($3, $0), =($3, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:1;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:1;]}], select=[user_id,description,order_id,user_id0,payment_id,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARC [...] + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id), (payment_id, user_id)], joinConditions=[true, =(user_id0, user_id), =(user_id0, user_id1)], joinFilter=[AND(=(user_id0, user_id1), =(user_id0, user_id))], select=[user_id,description,order_id,user_id0,payment_id,user_id1], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) description, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0 [...] :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersPK, project=[user_id, description], metadata=[]]], fields=[user_id, description]) :- Exchange(distribution=[hash[user_id]]) @@ -338,7 +338,7 @@ LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_ <![CDATA[ Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, order_id, product, region_id]) +- Calc(select=[user_id0 AS user_id, order_id, product, region_id]) - +- MultiJoin(joinFilter=[=($0, $3)], joinTypes=[[INNER, INNER]], joinConditions=[[true, =($0, $3)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;]}], select=[user_id,region_id,order_id,user_id0,product], rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (order_id, user_id)], joinConditions=[true, =(user_id, user_id0)], joinFilter=[=(user_id, user_id0)], select=[user_id,region_id,order_id,user_id0,product], outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersPK, project=[user_id, region_id], metadata=[]]], fields=[user_id, region_id]) +- Exchange(distribution=[hash[user_id]]) @@ -363,7 +363,7 @@ LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_ <![CDATA[ Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, order_id, product, region_id], upsertMaterialize=[true]) +- Calc(select=[user_id0 AS user_id, order_id, product, region_id]) - +- MultiJoin(joinFilter=[=($0, $3)], joinTypes=[[INNER, INNER]], joinConditions=[[true, =($0, $3)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;]}], select=[user_id,region_id,order_id,user_id0,product], rowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id), (order_id)], joinConditions=[true, =(user_id, user_id0)], joinFilter=[=(user_id, user_id0)], select=[user_id,region_id,order_id,user_id0,product], outputRowType=[RecordType(VARCHAR(2147483647) user_id, INTEGER region_id, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) product)]) :- Exchange(distribution=[hash[user_id]]) : +- ChangelogNormalize(key=[user_id]) : +- Exchange(distribution=[hash[user_id]]) @@ -392,7 +392,7 @@ LogicalSink(table=[default_catalog.default_database.sink_two_way], fields=[user_ <![CDATA[ Sink(table=[default_catalog.default_database.sink_two_way], fields=[user_id, order_id, product, region_id]) +- Calc(select=[user_id, order_id, product, region_id]) - +- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], joinConditions=[[true, =($3, $1)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:1;], 1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:0;]}], select=[order_id,user_id,product,user_id0,region_id], rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER region_id)]) + +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[(order_id, user_id), (user_id)], joinConditions=[true, =(user_id0, user_id)], joinFilter=[true], select=[order_id,user_id,product,user_id0,region_id], outputRowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id0, INTEGER region_id)]) :- Exchange(distribution=[hash[user_id]]) : +- TableSourceScan(table=[[default_catalog, default_database, OrdersPK]], fields=[order_id, user_id, product]) +- Exchange(distribution=[hash[user_id]]) @@ -423,7 +423,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], age=[$7]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, age]) -+- MultiJoin(joinFilter=[AND(=($0, $4), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], select=[user_id_0,name,order_id,user_id_1,user_id,age], [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id)], joinFilter=[AND(=(user_id_0, user_id), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,user_id,age], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_i [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -506,7 +506,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,use [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -528,7 +528,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) == Optimized Physical Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,use [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -538,7 +538,7 @@ Calc(select=[user_id_0, name, order_id, payment_id]) == Optimized Execution Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,use [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -565,10 +565,10 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[=($2, $5)], joinTypes=[[INNER, INNER]], joinConditions=[[true, =($2, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:2;], 1=[LeftInputId:0;LeftFieldIndex:2;RightInputId:1;RightFieldIndex:1;]}], select=[user_id_0,name,cash,order_id,payment_id,price], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, INTEGER price)]) ++- MultiJoin(commonJoinKey=[cash], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, =(cash, price)], joinFilter=[=(cash, price)], select=[user_id_0,name,cash,order_id,payment_id,price], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, INTEGER price)]) :- Exchange(distribution=[hash[cash]]) : +- Calc(select=[user_id_0, name, cash, order_id]) - : +- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, INNER]], joinConditions=[[true, =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;]}], select=[user_id_0,name,cash,order_id,user_id_1], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1)]) + : +- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, =(user_id_0, user_id_1)], joinFilter=[=(user_id_0, user_id_1)], select=[user_id_0,name,cash,order_id,user_id_1], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1)]) : :- Exchange(distribution=[hash[user_id_0]]) : : +- ChangelogNormalize(key=[user_id_0]) : : +- Exchange(distribution=[hash[user_id_0]]) @@ -597,7 +597,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,use [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -619,7 +619,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) == Optimized Physical Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -629,7 +629,7 @@ Calc(select=[user_id_0, name, order_id, payment_id]) == Optimized Execution Plan == Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -656,7 +656,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,use [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -683,7 +683,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[AND(=($0, $5), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,use [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147 [...] :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -709,9 +709,9 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -MultiJoin(joinFilter=[true], joinTypes=[[INNER, INNER]], joinConditions=[[true, true]], joinAttributeMap=[{}], select=[user_id_0,name,order_id,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)]) +MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[true, true], joinFilter=[true], select=[user_id_0,name,order_id,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id)]) :- Exchange(distribution=[single]) -: +- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], joinConditions=[[true, true]], joinAttributeMap=[{}], select=[user_id_0,name,order_id], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id)]) +: +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id)], joinConditions=[true, true], joinFilter=[true], select=[user_id_0,name,order_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id)]) : :- Exchange(distribution=[single]) : : +- ChangelogNormalize(key=[user_id_0]) : : +- Exchange(distribution=[hash[user_id_0]]) @@ -742,7 +742,7 @@ LogicalProject(name=[$1], proctime=[$2], rowtime=[$5], price=[$7]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[name, proctime, rowtime, price]) -+- MultiJoin(joinFilter=[AND(=($0, $6), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $6)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,proctime,user_id_1,rowtime,price, [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), noUniqueKey, noUniqueKey], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[AND(=(user_id_0, user_id_2), =(user_id_0, user_id_1))], select=[user_id_0,name,proctime,user_id_1,rowtime,price,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) proctime, VARCHAR(2147483647) user_i [...] :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0, name, PROCTIME_MATERIALIZE(PROCTIME()) AS proctime]) : +- TableSourceScan(table=[[default_catalog, default_database, UsersWithProctime]], fields=[user_id_0, name]) @@ -773,7 +773,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) -+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[=(name, 'Gus')]) @@ -804,7 +804,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -831,7 +831,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[user_id_0, name, order_id, payment_id]) -+- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], joinConditions=[[true, =($0, $3), =($0, $5)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], rowType=[Recor [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[true], select=[user_id_0,name,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- TableSourceScan(table=[[default_catalog, default_database, Users, project=[user_id_0, name], metadata=[]]], fields=[user_id_0, name]) :- Exchange(distribution=[hash[user_id_1]]) @@ -854,7 +854,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) == Optimized Physical Plan == Calc(select=[user_id_0, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS name, order_id, CAST(payment_id AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS payment_id]) -+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, =(user_id_0, user_id_1), =(user_id_0, user_id_2)], joinFilter=[=(user_id_0, user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[=(name, _UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) @@ -868,7 +868,7 @@ Calc(select=[user_id_0, CAST(_UTF-16LE'Gus':VARCHAR(2147483647) CHARACTER SET "U == Optimized Execution Plan == Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) -+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')]) @@ -900,7 +900,7 @@ LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647)) AS name, order_id, CAST(payment_id AS VARCHAR(2147483647)) AS payment_id]) -+- MultiJoin(joinFilter=[=($0, $4)], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{0=[LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;, LeftInputId:-1;LeftFieldIndex:-1;RightInputId:0;RightFieldIndex:0;], 1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], rowType=[Reco [...] ++- MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = user_id_2)], select=[user_id_0,order_id,user_id_1,payment_id,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) user_id_2)]) :- Exchange(distribution=[hash[user_id_0]]) : +- Calc(select=[user_id_0]) : +- ChangelogNormalize(key=[user_id_0], condition=[(name = 'Gus')]) diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json index 1976c12bf92..f25678040a6 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json @@ -447,7 +447,7 @@ "priority" : 3 } ], "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `cash` INT, `order_id` VARCHAR(2147483647), `user_id_1` VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL, `price` INT, `location` VARCHAR(2147483647), `user_id_3` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($1, $5), OR(>=($2, $7), <($7, 0)))], joinTypes=[[INNER, LEFT, INNER, LEFT]], joinConditions=[[true, =($1, $4), AND(=($1, $5), OR(>=($2, $7), <($7, 0))), =($5, $9)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:0;], 3=[LeftInputId:2;LeftFieldIndex:0;RightInputId:3;RightFieldIndex:1;]}], select=[name,user_id_0,cash,order_id,user_id_1 [...] + "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER, LEFT], inputUniqueKeys=[(user_id_0), (order_id), (payment_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), ((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0))), (user_id_2 = user_id_3)], joinFilter=[((user_id_0 = user_id_2) AND ((cash >= price) OR (price < 0)))], select=[name,user_id_0,cash,order_id,user_id_1,user_id_2,payment_id,price,location,user_id_3], outputRowType=[Re [...] }, { "id" : 30, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json index 596942d5eea..e0ab1634be5 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json @@ -312,7 +312,7 @@ "priority" : 2 } ], "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `user_id_1` VARCHAR(2147483647), `user_id_2` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL>", - "description" : "MultiJoin(joinFilter=[=($1, $4)], joinTypes=[[INNER, LEFT, INNER]], joinConditions=[[true, =($1, $3), =($1, $4)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:1;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:1;RightInputId:2;RightFieldIndex:0;]}], select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) use [...] + "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, LEFT, INNER], inputUniqueKeys=[(user_id_0), (order_id), (payment_id)], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[(user_id_0 = user_id_2)], select=[name,user_id_0,order_id,user_id_1,user_id_2,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) user_ [...] }, { "id" : 41, "type" : "stream-exec-calc_1", @@ -462,7 +462,7 @@ "priority" : 1 } ], "outputType" : "ROW<`name` VARCHAR(2147483647), `user_id_0` VARCHAR(2147483647) NOT NULL, `order_id` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647) NOT NULL, `location` VARCHAR(2147483647), `user_id_3` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], joinConditions=[[true, =($3, $5)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:3;RightInputId:1;RightFieldIndex:1;]}], select=[name,user_id_0,order_id,payment_id,location,user_id_3], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])" + "description" : "MultiJoin(commonJoinKey=[payment_id], joinTypes=[INNER, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[true, (payment_id = user_id_3)], joinFilter=[true], select=[name,user_id_0,order_id,payment_id,location,user_id_3], outputRowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, VARCHAR(2147483647) location, VARCHAR(2147483647) user_id_3)])" }, { "id" : 46, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json index 8c6259fcd1e..b1cd1345b66 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json @@ -225,7 +225,7 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($0, $4), =($0, $2))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147 [...] + "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[true, (user_id = user_id0), (user_id = user_id1)], joinFilter=[((user_id = user_id1) AND (user_id = user_id0))], select=[user_id,name,user_id0,order_id,user_id1,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483 [...] }, { "id" : 8, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json index fd59b796425..a7dd1890952 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json @@ -357,7 +357,7 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id_0` VARCHAR(2147483647) NOT NULL, `name` VARCHAR(2147483647), `order_id` VARCHAR(2147483647) NOT NULL, `user_id_1` VARCHAR(2147483647), `rowtime` TIMESTAMP(3), `price` INT, `user_id_2` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[AND(=($0, $6), =($0, $3))], joinTypes=[[INNER, INNER, INNER]], joinConditions=[[true, =($0, $3), =($0, $6)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:1;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:1;]}], select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VAR [...] + "description" : "MultiJoin(commonJoinKey=[user_id_0], joinTypes=[INNER, INNER, INNER], inputUniqueKeys=[(user_id_0), (order_id), noUniqueKey], joinConditions=[true, (user_id_0 = user_id_1), (user_id_0 = user_id_2)], joinFilter=[((user_id_0 = user_id_2) AND (user_id_0 = user_id_1))], select=[user_id_0,name,order_id,user_id_1,rowtime,price,user_id_2], outputRowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) use [...] }, { "id" : 57, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json index 4c43ee089d5..258fca068dc 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json @@ -225,7 +225,7 @@ "priority" : 2 } ], "outputType" : "ROW<`user_id` VARCHAR(2147483647), `name` VARCHAR(2147483647), `user_id0` VARCHAR(2147483647), `order_id` VARCHAR(2147483647), `user_id1` VARCHAR(2147483647), `payment_id` VARCHAR(2147483647)>", - "description" : "MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT, LEFT]], joinConditions=[[true, =($0, $2), =($0, $4)]], joinAttributeMap=[{1=[LeftInputId:0;LeftFieldIndex:0;RightInputId:1;RightFieldIndex:0;], 2=[LeftInputId:0;LeftFieldIndex:0;RightInputId:2;RightFieldIndex:0;]}], select=[user_id,name,user_id0,order_id,user_id1,payment_id], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCH [...] + "description" : "MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, LEFT, LEFT], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[true, (user_id = user_id0), (user_id = user_id1)], joinFilter=[true], select=[user_id,name,user_id0,order_id,user_id1,payment_id], outputRowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id1, VARCHAR(2147483647) payment_id)])" }, { "id" : 17, "type" : "stream-exec-calc_1",
