This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6c04ac60f21ee6a8dd74f9a552b1344359ee306c
Author: Gustavo de Morais <gdemor...@confluent.io>
AuthorDate: Mon Aug 11 14:17:17 2025 +0200

    [FLINK-38219][table-runtime] Fix row kind in StreamingMultiJoinOperator
    
    This closes #26886.
---
 .../nodes/exec/stream/MultiJoinSemanticTests.java  |  3 +-
 .../nodes/exec/stream/MultiJoinTestPrograms.java   | 64 ++++++++++++++++++++++
 .../join/stream/StreamingMultiJoinOperator.java    |  9 ++-
 .../join/stream/state/MultiJoinStateViews.java     | 21 ++-----
 4 files changed, 78 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
index 5d1bce0d8e8..923d6ecc78d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
@@ -36,6 +36,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase {
                 MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX,
                 
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_MATERIALIZATION,
                 
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_NO_JOIN_KEY,
-                MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY);
+                MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY,
+                
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
index e880ef04c32..10f289fb5a2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
@@ -907,4 +907,68 @@ public class MultiJoinTestPrograms {
                                             + "INNER JOIN OrdersWithRowtime o 
ON u.user_id_0 = o.user_id_1 "
                                             + "INNER JOIN Payments p ON 
u.user_id_0 = p.user_id_2")
                             .build();
+
+    public static final TableTestProgram 
MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE =
+            TableTestProgram.of(
+                            "left-outer-join-with-cte",
+                            "CTE with three-way left outer join and 
aggregation")
+                    
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
+                    .setupTableSource(USERS_SOURCE)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("Orders")
+                                    .addSchema(
+                                            "user_id STRING",
+                                            "order_id STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "product STRING")
+                                    .addOption("changelog-mode", "I, UA,D")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "2", 
"order2", "Product B"),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "2",
+                                                    "order2",
+                                                    "Product B"),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "2",
+                                                    "order2",
+                                                    "Product C"),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    "2",
+                                                    "order2",
+                                                    "Product C"))
+                                    .build())
+                    .setupTableSource(PAYMENTS_SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("user_id STRING", "name 
STRING", "cnt BIGINT")
+                                    .testMaterializedData()
+                                    .consumedValues(Row.of("1", "Gus", 2), 
Row.of("2", "Bob", 1))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink WITH "
+                                    + "    order_details AS ( "
+                                    + "        SELECT o.user_id "
+                                    + "        FROM Orders o "
+                                    + "    ), "
+                                    + "    user_elements AS ( "
+                                    + "        SELECT "
+                                    + "            u.id AS user_id "
+                                    + "        FROM ( "
+                                    + "          SELECT '2' AS id, '2' AS 
order_user_id "
+                                    + "          UNION ALL "
+                                    + "          SELECT '1' AS id, '2' AS 
order_user_id "
+                                    + "          UNION ALL "
+                                    + "          SELECT '5' AS id, '5' AS 
order_user_id "
+                                    + "        ) u "
+                                    + "        LEFT JOIN order_details od "
+                                    + "            ON od.user_id = 
u.order_user_id "
+                                    + "    ) "
+                                    + "SELECT ue.user_id, us.name, COUNT(*) AS 
cnt "
+                                    + "FROM user_elements ue "
+                                    + "INNER JOIN Users us ON ue.user_id = 
us.user_id "
+                                    + "LEFT JOIN Payments p ON ue.user_id = 
p.user_id "
+                                    + "GROUP BY ue.user_id, us.name")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
index 062f6852c7d..3764b886d00 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java
@@ -777,12 +777,15 @@ public class StreamingMultiJoinOperator extends 
AbstractStreamOperatorV2<RowData
     }
 
     private void addRecordToState(RowData input, int inputId) throws Exception 
{
+        final boolean isUpsert = isUpsert(input);
         RowData joinKey = keyExtractor.getJoinKey(input, inputId);
 
-        if (isRetraction(input)) {
-            stateHandlers.get(inputId).retractRecord(joinKey, input);
-        } else {
+        // Always use insert so we store and retract records correctly from 
state
+        input.setRowKind(RowKind.INSERT);
+        if (isUpsert) {
             stateHandlers.get(inputId).addRecord(joinKey, input);
+        } else {
+            stateHandlers.get(inputId).retractRecord(joinKey, input);
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
index a0a59922c51..6d3caba2ba3 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideS
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
 import org.apache.flink.util.IterableIterator;
 
 import javax.annotation.Nonnull;
@@ -238,15 +237,15 @@ public final class MultiJoinStateViews {
 
         @Override
         public void addRecord(@Nullable RowData joinKey, RowData record) 
throws Exception {
-            RowData uniqueKey = uniqueKeySelector.getKey(record);
-            RowData stateKey = getStateKey(joinKey, uniqueKey);
+            final RowData uniqueKey = uniqueKeySelector.getKey(record);
+            final RowData stateKey = getStateKey(joinKey, uniqueKey);
             recordState.put(stateKey, record);
         }
 
         @Override
         public void retractRecord(@Nullable RowData joinKey, RowData record) 
throws Exception {
-            RowData uniqueKey = uniqueKeySelector.getKey(record);
-            RowData stateKey = getStateKey(joinKey, uniqueKey);
+            final RowData uniqueKey = uniqueKeySelector.getKey(record);
+            final RowData stateKey = getStateKey(joinKey, uniqueKey);
             recordState.remove(stateKey);
         }
 
@@ -364,25 +363,18 @@ public final class MultiJoinStateViews {
 
         @Override
         public void addRecord(@Nullable RowData joinKey, RowData record) 
throws Exception {
-            // Normalize RowKind for consistent state representation
-            RowKind originalKind = record.getRowKind();
-            record.setRowKind(RowKind.INSERT); // Normalize for key creation
-            RowData stateKey = getStateKey(joinKey, record);
+            final RowData stateKey = getStateKey(joinKey, record);
 
             Integer currentCount = recordState.get(stateKey);
             if (currentCount == null) {
                 currentCount = 0;
             }
             recordState.put(stateKey, currentCount + 1);
-            record.setRowKind(originalKind); // Restore original RowKind
         }
 
         @Override
         public void retractRecord(@Nullable RowData joinKey, RowData record) 
throws Exception {
-            // Normalize RowKind for consistent state representation and lookup
-            RowKind originalKind = record.getRowKind();
-            record.setRowKind(RowKind.INSERT); // Normalize for key lookup
-            RowData stateKey = getStateKey(joinKey, record);
+            final RowData stateKey = getStateKey(joinKey, record);
 
             Integer currentCount = recordState.get(stateKey);
             if (currentCount != null) {
@@ -392,7 +384,6 @@ public final class MultiJoinStateViews {
                     recordState.remove(stateKey);
                 }
             }
-            record.setRowKind(originalKind); // Restore original RowKind
         }
 
         @Override

Reply via email to