This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d2e47276add [FLINK-38043][table] Use empty row instead of null for no 
join keys
d2e47276add is described below

commit d2e47276add608926f27152b4e1a91eccbc403d2
Author: Gustavo de Morais <gdemor...@confluent.io>
AuthorDate: Thu Jul 3 16:23:14 2025 +0200

    [FLINK-38043][table] Use empty row instead of null for no join keys
---
 .../nodes/exec/stream/MultiJoinSemanticTests.java  |  1 +
 .../nodes/exec/stream/MultiJoinTestPrograms.java   | 74 ++++++++++++++++++++++
 .../planner/plan/stream/sql/MultiJoinTest.java     |  9 +++
 .../planner/plan/stream/sql/MultiJoinTest.xml      | 31 +++++++++
 .../AttributeBasedJoinKeyExtractor.java            |  5 +-
 5 files changed, 118 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
index c9ef7c4745a..5d1bce0d8e8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
@@ -35,6 +35,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase {
                 
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_WHERE,
                 MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX,
                 
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_MATERIALIZATION,
+                
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_NO_JOIN_KEY,
                 MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
index c6a2758971e..e880ef04c32 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
@@ -384,6 +384,80 @@ public class MultiJoinTestPrograms {
                                     + "INNER JOIN Payments p ON u.user_id = 
p.user_id")
                     .build();
 
+    public static final TableTestProgram 
MULTI_JOIN_THREE_WAY_INNER_JOIN_NO_JOIN_KEY =
+            TableTestProgram.of(
+                            "three-way-inner-join-no-join-key",
+                            "three way inner join with no join key")
+                    
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
+                    .setupTableSource(USERS_SOURCE)
+                    .setupTableSource(ORDERS_SOURCE)
+                    .setupTableSource(PAYMENTS_SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "user_id STRING",
+                                            "name STRING",
+                                            "order_id STRING",
+                                            "payment_id STRING")
+                                    .consumedValues(
+                                            "+I[1, Gus, order1, payment2]",
+                                            "+I[1, Gus, order1, payment5]",
+                                            "+I[1, Gus, order1, payment3]",
+                                            "+I[1, Gus, order1, payment1]",
+                                            "+I[2, Bob, order1, payment2]",
+                                            "+I[2, Bob, order1, payment5]",
+                                            "+I[2, Bob, order1, payment3]",
+                                            "+I[2, Bob, order1, payment1]",
+                                            "+I[3, Alice, order1, payment2]",
+                                            "+I[3, Alice, order1, payment5]",
+                                            "+I[3, Alice, order1, payment3]",
+                                            "+I[3, Alice, order1, payment1]",
+                                            "+I[3, Alice, order2, payment2]",
+                                            "+I[3, Alice, order2, payment5]",
+                                            "+I[3, Alice, order2, payment3]",
+                                            "+I[3, Alice, order2, payment1]",
+                                            "+I[1, Gus, order2, payment2]",
+                                            "+I[1, Gus, order2, payment5]",
+                                            "+I[1, Gus, order2, payment3]",
+                                            "+I[1, Gus, order2, payment1]",
+                                            "+I[2, Bob, order2, payment2]",
+                                            "+I[2, Bob, order2, payment5]",
+                                            "+I[2, Bob, order2, payment3]",
+                                            "+I[2, Bob, order2, payment1]",
+                                            "+I[3, Alice, order3, payment2]",
+                                            "+I[3, Alice, order3, payment5]",
+                                            "+I[3, Alice, order3, payment3]",
+                                            "+I[3, Alice, order3, payment1]",
+                                            "+I[1, Gus, order3, payment2]",
+                                            "+I[1, Gus, order3, payment5]",
+                                            "+I[1, Gus, order3, payment3]",
+                                            "+I[1, Gus, order3, payment1]",
+                                            "+I[2, Bob, order3, payment2]",
+                                            "+I[2, Bob, order3, payment5]",
+                                            "+I[2, Bob, order3, payment3]",
+                                            "+I[2, Bob, order3, payment1]",
+                                            "+I[3, Alice, order4, payment2]",
+                                            "+I[3, Alice, order4, payment5]",
+                                            "+I[3, Alice, order4, payment3]",
+                                            "+I[3, Alice, order4, payment1]",
+                                            "+I[1, Gus, order4, payment2]",
+                                            "+I[1, Gus, order4, payment5]",
+                                            "+I[1, Gus, order4, payment3]",
+                                            "+I[1, Gus, order4, payment1]",
+                                            "+I[2, Bob, order4, payment2]",
+                                            "+I[2, Bob, order4, payment5]",
+                                            "+I[2, Bob, order4, payment3]",
+                                            "+I[2, Bob, order4, payment1]")
+                                    .testMaterializedData()
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink "
+                                    + "SELECT u.user_id, u.name, o.order_id, 
p.payment_id "
+                                    + "FROM Users u "
+                                    + "LEFT JOIN Orders o ON TRUE "
+                                    + "INNER JOIN Payments p ON TRUE")
+                    .build();
+
     public static final TableTestProgram 
MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY_RESTORE =
             TableTestProgram.of(
                             "four-way-join-no-common-join-key-with-restore",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
index 98e711505e0..a6a488d3db3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
@@ -181,6 +181,15 @@ public class MultiJoinTest extends TableTestBase {
                         + "LEFT JOIN Shipments s ON p.user_id_2 = 
s.user_id_3");
     }
 
+    @Test
+    void testThreeWayJoinNoJoinKeyExecPlan() {
+        util.verifyExecPlan(
+                "SELECT u.user_id_0, u.name, o.order_id, p.payment_id "
+                        + "FROM Users u "
+                        + "LEFT JOIN Orders o ON TRUE "
+                        + "INNER JOIN Payments p ON TRUE ");
+    }
+
     @Test
     void testFourWayJoinNoCommonJoinKeyRelPlan() {
         util.verifyRelPlan(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 8a2f3517645..ba05d7b6778 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -485,6 +485,37 @@ Calc(select=[user_id_0, name, order_id, payment_id])
    :        +- TableSourceScan(table=[[default_catalog, default_database, 
Orders, project=[order_id, user_id_1], metadata=[]]], fields=[order_id, 
user_id_1])
    +- Exchange(distribution=[hash[user_id_2]])
       +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id, user_id_2], metadata=[]]], fields=[payment_id, user_id_2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testThreeWayJoinNoJoinKeyExecPlan">
+    <Resource name="sql">
+      <![CDATA[SELECT u.user_id_0, u.name, o.order_id, p.payment_id FROM Users 
u LEFT JOIN Orders o ON TRUE INNER JOIN Payments p ON TRUE ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(user_id_0=[$0], name=[$1], order_id=[$3], payment_id=[$6])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalJoin(condition=[true], joinType=[left])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, Users]])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, Payments]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[user_id_0, name, order_id, payment_id])
++- MultiJoin(joinFilter=[true], joinTypes=[[INNER, INNER]], 
joinConditions=[[true, true]], joinAttributeMap=[{}], 
select=[user_id_0,name,cash,order_id,user_id_1,product,payment_id,price,user_id_2],
 rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) product, VARCHAR(2147483647) payment_id, INTEGER price, 
VARCHAR(2147483647) user_id_2)])
+   :- Exchange(distribution=[single])
+   :  +- MultiJoin(joinFilter=[true], joinTypes=[[INNER, LEFT]], 
joinConditions=[[true, true]], joinAttributeMap=[{}], 
select=[user_id_0,name,cash,order_id,user_id_1,product], 
rowType=[RecordType(VARCHAR(2147483647) user_id_0, VARCHAR(2147483647) name, 
INTEGER cash, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id_1, 
VARCHAR(2147483647) product)])
+   :     :- Exchange(distribution=[single])
+   :     :  +- ChangelogNormalize(key=[user_id_0])
+   :     :     +- Exchange(distribution=[hash[user_id_0]])
+   :     :        +- TableSourceScan(table=[[default_catalog, 
default_database, Users]], fields=[user_id_0, name, cash])
+   :     +- Exchange(distribution=[single])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
Orders]], fields=[order_id, user_id_1, product])
+   +- Exchange(distribution=[single])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
Payments]], fields=[payment_id, price, user_id_2])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java
index b2a8b00025d..9566dd91186 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java
@@ -138,11 +138,12 @@ public class AttributeBasedJoinKeyExtractor implements 
JoinKeyExtractor, Seriali
 
     @Override
     public RowType getCommonJoinKeyType() {
-        return this.commonJoinKeyType;
+        // We return an empty RowType if no common join key is defined.
+        return Objects.requireNonNullElseGet(this.commonJoinKeyType, 
RowType::of);
     }
 
     @Override
-    public RowData getCommonJoinKey(RowData row, int inputId) {
+    public @Nullable RowData getCommonJoinKey(RowData row, int inputId) {
         List<KeyExtractor> extractors = commonJoinKeyExtractors.get(inputId);
         if (extractors == null || extractors.isEmpty()) {
             return null;

Reply via email to