This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5440238393d [FLINK-38916][table-planner] MultiJoin produces incorrect 
results for OR join conditions when parallelism is greater than 1
5440238393d is described below

commit 5440238393dfeb7d7a7c3463359f61446655ea0d
Author: Stepan Stepanishchev 
<[email protected]>
AuthorDate: Tue Feb 24 23:04:42 2026 +0700

    [FLINK-38916][table-planner] MultiJoin produces incorrect results for OR 
join conditions when parallelism is greater than 1
    
    This closes #27498.
---
 .../table/planner/plan/utils/MultiJoinUtil.java    | 10 +++++++---
 .../planner/plan/stream/sql/MultiJoinTest.xml      | 23 +++++++++++++---------
 2 files changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
index 83bf9be50c1..59a69599381 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
@@ -71,9 +71,13 @@ public class MultiJoinUtil {
         final SqlKind kind = call.getOperator().getKind();
 
         if (kind != SqlKind.EQUALS) {
-            for (final RexNode operand : call.getOperands()) {
-                extractEqualityConditions(
-                        operand, inputOffsets, inputFieldCounts, 
joinAttributeMap);
+            // Only conjunctions (AND) can contain equality conditions that 
are valid for multijoin.
+            // All other condition types are deferred to the postJoinFilter.
+            if (kind == SqlKind.AND) {
+                for (final RexNode operand : call.getOperands()) {
+                    extractEqualityConditions(
+                            operand, inputOffsets, inputFieldCounts, 
joinAttributeMap);
+                }
             }
             return;
         }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 757b6711cea..3a5e888ca25 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -2794,15 +2794,20 @@ LogicalProject(user_id=[$0], name=[$1], order_id=[$3], 
payment_id=[$6], location
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[user_id, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, LEFT, LEFT], 
inputUniqueKeys=[(user_id), (order_id), (payment_id), noUniqueKey], 
joinConditions=[=(user_id0, user_id), OR(=(user_id, user_id1), =(name, 
payment_id)), =(user_id1, user_id2)], 
select=[user_id,name,order_id,user_id0,payment_id,user_id1,location,user_id2], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) 
payment_id, V [...]
-   :- Exchange(distribution=[hash[user_id]])
-   :  +- ChangelogNormalize(key=[user_id])
-   :     +- Exchange(distribution=[hash[user_id]])
-   :        +- TableSourceScan(table=[[default_catalog, default_database, 
Users, project=[user_id, name], metadata=[]]], fields=[user_id, name])
-   :- Exchange(distribution=[hash[user_id]])
-   :  +- TableSourceScan(table=[[default_catalog, default_database, Orders, 
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
-   :- Exchange(distribution=[hash[user_id]])
-   :  +- TableSourceScan(table=[[default_catalog, default_database, Payments, 
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
++- MultiJoin(commonJoinKey=[user_id1], joinTypes=[LEFT], 
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(user_id1, 
user_id2)], 
select=[user_id,name,order_id,payment_id,user_id1,location,user_id2], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647) 
user_id2)])
+   :- Exchange(distribution=[hash[user_id1]])
+   :  +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[LEFT], 
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[OR(=(user_id, 
user_id1), =(name, payment_id))], 
select=[user_id,name,order_id,payment_id,user_id1], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id, 
VARCHAR(2147483647) user_id1)])
+   :     :- Exchange(distribution=[single])
+   :     :  +- Calc(select=[user_id, name, order_id])
+   :     :     +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT], 
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id0, user_id)], 
select=[user_id,name,order_id,user_id0], 
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, 
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+   :     :        :- Exchange(distribution=[hash[user_id]])
+   :     :        :  +- ChangelogNormalize(key=[user_id])
+   :     :        :     +- Exchange(distribution=[hash[user_id]])
+   :     :        :        +- TableSourceScan(table=[[default_catalog, 
default_database, Users, project=[user_id, name], metadata=[]]], 
fields=[user_id, name])
+   :     :        +- Exchange(distribution=[hash[user_id]])
+   :     :           +- TableSourceScan(table=[[default_catalog, 
default_database, Orders, project=[order_id, user_id], metadata=[]]], 
fields=[order_id, user_id])
+   :     +- Exchange(distribution=[single])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id, 
user_id])
    +- Exchange(distribution=[hash[user_id]])
       +- TableSourceScan(table=[[default_catalog, default_database, 
Shipments]], fields=[location, user_id])
 ]]>

Reply via email to