This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d2e47276add [FLINK-38043][table] Use empty row instead of null for no join keys d2e47276add is described below commit d2e47276add608926f27152b4e1a91eccbc403d2 Author: Gustavo de Morais <gdemor...@confluent.io> AuthorDate: Thu Jul 3 16:23:14 2025 +0200 [FLINK-38043][table] Use empty row instead of null for no join keys --- .../nodes/exec/stream/MultiJoinSemanticTests.java | 1 + .../nodes/exec/stream/MultiJoinTestPrograms.java | 74 ++++++++++++++++++++++ .../planner/plan/stream/sql/MultiJoinTest.java | 9 +++ .../planner/plan/stream/sql/MultiJoinTest.xml | 31 +++++++++ .../AttributeBasedJoinKeyExtractor.java | 5 +- 5 files changed, 118 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java index c9ef7c4745a..5d1bce0d8e8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java @@ -35,6 +35,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase { MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_WHERE, MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX, MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_MATERIALIZATION, + MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_NO_JOIN_KEY, MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java index c6a2758971e..e880ef04c32 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java @@ -384,6 +384,80 @@ public class MultiJoinTestPrograms { + "INNER JOIN Payments p ON u.user_id = p.user_id") .build(); + public static final TableTestProgram MULTI_JOIN_THREE_WAY_INNER_JOIN_NO_JOIN_KEY = + TableTestProgram.of( + "three-way-inner-join-no-join-key", + "three way inner join with no join key") + .setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true) + .setupTableSource(USERS_SOURCE) + .setupTableSource(ORDERS_SOURCE) + .setupTableSource(PAYMENTS_SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "user_id STRING", + "name STRING", + "order_id STRING", + "payment_id STRING") + .consumedValues( + "+I[1, Gus, order1, payment2]", + "+I[1, Gus, order1, payment5]", + "+I[1, Gus, order1, payment3]", + "+I[1, Gus, order1, payment1]", + "+I[2, Bob, order1, payment2]", + "+I[2, Bob, order1, payment5]", + "+I[2, Bob, order1, payment3]", + "+I[2, Bob, order1, payment1]", + "+I[3, Alice, order1, payment2]", + "+I[3, Alice, order1, payment5]", + "+I[3, Alice, order1, payment3]", + "+I[3, Alice, order1, payment1]", + "+I[3, Alice, order2, payment2]", + "+I[3, Alice, order2, payment5]", + "+I[3, Alice, order2, payment3]", + "+I[3, Alice, order2, payment1]", + "+I[1, Gus, order2, payment2]", + "+I[1, Gus, order2, payment5]", + "+I[1, Gus, order2, payment3]", + "+I[1, Gus, order2, payment1]", + "+I[2, Bob, order2, payment2]", + "+I[2, Bob, order2, payment5]", + "+I[2, Bob, order2, payment3]", + "+I[2, Bob, order2, payment1]", + "+I[3, Alice, order3, payment2]", + "+I[3, Alice, order3, payment5]", + "+I[3, Alice, order3, payment3]", + "+I[3, Alice, order3, payment1]", + "+I[1, Gus, order3, payment2]", + "+I[1, Gus, order3, payment5]", + "+I[1, Gus, order3, payment3]", + "+I[1, Gus, order3, payment1]", + "+I[2, Bob, order3, payment2]", + "+I[2, Bob, order3, payment5]", + "+I[2, Bob, order3, payment3]", + "+I[2, Bob, order3, payment1]", + "+I[3, Alice, order4, payment2]", + "+I[3, Alice, order4, payment5]", + "+I[3, Alice, order4, payment3]", + "+I[3, Alice, order4, payment1]", + "+I[1, Gus, order4, payment2]", + "+I[1, Gus, order4, payment5]", + "+I[1, Gus, order4, payment3]", + "+I[1, Gus, order4, payment1]", + "+I[2, Bob, order4, payment2]", + "+I[2, Bob, order4, payment5]", + "+I[2, Bob, order4, payment3]", + "+I[2, Bob, order4, payment1]") + .testMaterializedData() + .build()) + .runSql( + "INSERT INTO sink " + + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + + "FROM Users u " + + "LEFT JOIN Orders o ON TRUE " + + "INNER JOIN Payments p ON TRUE") + .build(); + public static final TableTestProgram MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY_RESTORE = TableTestProgram.of( "four-way-join-no-common-join-key-with-restore", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java index 98e711505e0..a6a488d3db3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java @@ -181,6 +181,15 @@ public class MultiJoinTest extends TableTestBase { + "LEFT JOIN Shipments s ON p.user_id_2 = s.user_id_3"); } + @Test + void testThreeWayJoinNoJoinKeyExecPlan() { + util.verifyExecPlan( + "SELECT u.user_id_0, u.name, o.order_id, p.payment_id " + + "FROM Users u " + + "LEFT JOIN Orders o ON TRUE " + + "INNER JOIN Payments p ON TRUE "); + } + @Test void testFourWayJoinNoCommonJoinKeyRelPlan() { util.verifyRelPlan( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml index 8a2f3517645..ba05d7b6778 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml @@ -485,6 +485,37 @@ Calc(select=[user_id_0, name, order_id, payment_id]) : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, user_id_1]) +- Exchange(distribution=[hash[user_id_2]]) +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[payment_id, user_id_2], metadata=[]]], fields=[payment_id, user_id_2]) +]]> + </Resource> + </TestCase> + <TestCase name="testThreeWayJoinNoJoinKeyExecPlan"> + <Resource name="sql"> + <![CDATA[SELECT u.user_id_0, u.name, o.order_id, p.payment_id FROM Users u LEFT JOIN Orders o ON TRUE INNER JOIN Payments p ON TRUE ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6]) ++- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[left]) + : :- LogicalTableScan(table=[[default_catalog, default_database, Users]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) + +- LogicalTableScan(table=[[default_catalog, default_database, Payments]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[user_id_0, name, order_id, payment_id]) ++- MultiJoin(joinFilter=[true], joinTypes=[[INNER, INNER]], joinConditions=[[true, true]], joinAttributeMap=[{}], select=[user_id_0,name,cash,order_id,user_id_1,product,payment_id,price,user_id_2], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) product, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id_2)]) + :- Exchange(distribution=[single]) + : +- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], joinConditions=[[true, true]], joinAttributeMap=[{}], select=[user_id_0,name,cash,order_id,user_id_1,product], rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, VARCHAR(2147483647) product)]) + : :- Exchange(distribution=[single]) + : : +- ChangelogNormalize(key=[user_id_0]) + : : +- Exchange(distribution=[hash[user_id_0]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id_0, name, cash]) + : +- Exchange(distribution=[single]) + : +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, user_id_1, product]) + +- Exchange(distribution=[single]) + +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id_2]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java index b2a8b00025d..9566dd91186 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java @@ -138,11 +138,12 @@ public class AttributeBasedJoinKeyExtractor implements JoinKeyExtractor, Seriali @Override public RowType getCommonJoinKeyType() { - return this.commonJoinKeyType; + // We return an empty RowType if no common join key is defined. + return Objects.requireNonNullElseGet(this.commonJoinKeyType, RowType::of); } @Override - public RowData getCommonJoinKey(RowData row, int inputId) { + public @Nullable RowData getCommonJoinKey(RowData row, int inputId) { List<KeyExtractor> extractors = commonJoinKeyExtractors.get(inputId); if (extractors == null || extractors.isEmpty()) { return null;