This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5440238393d [FLINK-38916][table-planner] MultiJoin produces incorrect
results for OR join conditions when parallelism is greater than 1
5440238393d is described below
commit 5440238393dfeb7d7a7c3463359f61446655ea0d
Author: Stepan Stepanishchev
<[email protected]>
AuthorDate: Tue Feb 24 23:04:42 2026 +0700
[FLINK-38916][table-planner] MultiJoin produces incorrect results for OR
join conditions when parallelism is greater than 1
This closes #27498.
---
.../table/planner/plan/utils/MultiJoinUtil.java | 10 +++++++---
.../planner/plan/stream/sql/MultiJoinTest.xml | 23 +++++++++++++---------
2 files changed, 21 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
index 83bf9be50c1..59a69599381 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java
@@ -71,9 +71,13 @@ public class MultiJoinUtil {
final SqlKind kind = call.getOperator().getKind();
if (kind != SqlKind.EQUALS) {
- for (final RexNode operand : call.getOperands()) {
- extractEqualityConditions(
- operand, inputOffsets, inputFieldCounts,
joinAttributeMap);
+ // Only conjunctions (AND) can contain equality conditions that
are valid for multijoin.
+ // All other condition types are deferred to the postJoinFilter.
+ if (kind == SqlKind.AND) {
+ for (final RexNode operand : call.getOperands()) {
+ extractEqualityConditions(
+ operand, inputOffsets, inputFieldCounts,
joinAttributeMap);
+ }
}
return;
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index 757b6711cea..3a5e888ca25 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -2794,15 +2794,20 @@ LogicalProject(user_id=[$0], name=[$1], order_id=[$3],
payment_id=[$6], location
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[user_id, name, order_id, payment_id, location])
-+- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT, LEFT, LEFT],
inputUniqueKeys=[(user_id), (order_id), (payment_id), noUniqueKey],
joinConditions=[=(user_id0, user_id), OR(=(user_id, user_id1), =(name,
payment_id)), =(user_id1, user_id2)],
select=[user_id,name,order_id,user_id0,payment_id,user_id1,location,user_id2],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647)
payment_id, V [...]
- :- Exchange(distribution=[hash[user_id]])
- : +- ChangelogNormalize(key=[user_id])
- : +- Exchange(distribution=[hash[user_id]])
- : +- TableSourceScan(table=[[default_catalog, default_database,
Users, project=[user_id, name], metadata=[]]], fields=[user_id, name])
- :- Exchange(distribution=[hash[user_id]])
- : +- TableSourceScan(table=[[default_catalog, default_database, Orders,
project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id])
- :- Exchange(distribution=[hash[user_id]])
- : +- TableSourceScan(table=[[default_catalog, default_database, Payments,
project=[payment_id, user_id], metadata=[]]], fields=[payment_id, user_id])
++- MultiJoin(commonJoinKey=[user_id1], joinTypes=[LEFT],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(user_id1,
user_id2)],
select=[user_id,name,order_id,payment_id,user_id1,location,user_id2],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id,
VARCHAR(2147483647) user_id1, VARCHAR(2147483647) location, VARCHAR(2147483647)
user_id2)])
+ :- Exchange(distribution=[hash[user_id1]])
+ : +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[LEFT],
inputUniqueKeys=[noUniqueKey, (payment_id)], joinConditions=[OR(=(user_id,
user_id1), =(name, payment_id))],
select=[user_id,name,order_id,payment_id,user_id1],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) payment_id,
VARCHAR(2147483647) user_id1)])
+ : :- Exchange(distribution=[single])
+ : : +- Calc(select=[user_id, name, order_id])
+ : : +- MultiJoin(commonJoinKey=[user_id], joinTypes=[LEFT],
inputUniqueKeys=[(user_id), (order_id)], joinConditions=[=(user_id0, user_id)],
select=[user_id,name,order_id,user_id0],
rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name,
VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
+ : : :- Exchange(distribution=[hash[user_id]])
+ : : : +- ChangelogNormalize(key=[user_id])
+ : : : +- Exchange(distribution=[hash[user_id]])
+ : : : +- TableSourceScan(table=[[default_catalog,
default_database, Users, project=[user_id, name], metadata=[]]],
fields=[user_id, name])
+ : : +- Exchange(distribution=[hash[user_id]])
+ : : +- TableSourceScan(table=[[default_catalog,
default_database, Orders, project=[order_id, user_id], metadata=[]]],
fields=[order_id, user_id])
+ : +- Exchange(distribution=[single])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
Payments, project=[payment_id, user_id], metadata=[]]], fields=[payment_id,
user_id])
+- Exchange(distribution=[hash[user_id]])
+- TableSourceScan(table=[[default_catalog, default_database,
Shipments]], fields=[location, user_id])
]]>