This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6c04ac60f21ee6a8dd74f9a552b1344359ee306c Author: Gustavo de Morais <gdemor...@confluent.io> AuthorDate: Mon Aug 11 14:17:17 2025 +0200 [FLINK-38219][table-runtime] Fix row kind in StreamingMultiJoinOperator This closes #26886. --- .../nodes/exec/stream/MultiJoinSemanticTests.java | 3 +- .../nodes/exec/stream/MultiJoinTestPrograms.java | 64 ++++++++++++++++++++++ .../join/stream/StreamingMultiJoinOperator.java | 9 ++- .../join/stream/state/MultiJoinStateViews.java | 21 ++----- 4 files changed, 78 insertions(+), 19 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java index 5d1bce0d8e8..923d6ecc78d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java @@ -36,6 +36,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase { MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_COMPLEX, MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_MATERIALIZATION, MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_NO_JOIN_KEY, - MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY); + MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY, + MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java index e880ef04c32..10f289fb5a2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java @@ -907,4 +907,68 @@ public class MultiJoinTestPrograms { + "INNER JOIN OrdersWithRowtime o ON u.user_id_0 = o.user_id_1 " + "INNER JOIN Payments p ON u.user_id_0 = p.user_id_2") .build(); + + public static final TableTestProgram MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE = + TableTestProgram.of( + "left-outer-join-with-cte", + "CTE with three-way left outer join and aggregation") + .setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true) + .setupTableSource(USERS_SOURCE) + .setupTableSource( + SourceTestStep.newBuilder("Orders") + .addSchema( + "user_id STRING", + "order_id STRING PRIMARY KEY NOT ENFORCED", + "product STRING") + .addOption("changelog-mode", "I, UA,D") + .producedValues( + Row.ofKind(RowKind.INSERT, "2", "order2", "Product B"), + Row.ofKind( + RowKind.UPDATE_AFTER, + "2", + "order2", + "Product B"), + Row.ofKind( + RowKind.UPDATE_AFTER, + "2", + "order2", + "Product C"), + Row.ofKind( + RowKind.UPDATE_AFTER, + "2", + "order2", + "Product C")) + .build()) + .setupTableSource(PAYMENTS_SOURCE) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("user_id STRING", "name STRING", "cnt BIGINT") + .testMaterializedData() + .consumedValues(Row.of("1", "Gus", 2), Row.of("2", "Bob", 1)) + .build()) + .runSql( + "INSERT INTO sink WITH " + + " order_details AS ( " + + " SELECT o.user_id " + + " FROM Orders o " + + " ), " + + " user_elements AS ( " + + " SELECT " + + " u.id AS user_id " + + " FROM ( " + + " SELECT '2' AS id, '2' AS order_user_id " + + " UNION ALL " + + " SELECT '1' AS id, '2' AS order_user_id " + + " UNION ALL " + + " SELECT '5' AS id, '5' AS order_user_id " + + " ) u " + + " LEFT JOIN order_details od " + + " ON od.user_id = u.order_user_id " + + " ) " + + "SELECT ue.user_id, us.name, COUNT(*) AS cnt " + + "FROM user_elements ue " + + "INNER JOIN Users us ON ue.user_id = us.user_id " + + "LEFT JOIN Payments p ON ue.user_id = p.user_id " + + "GROUP BY ue.user_id, us.name") + .build(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java index 062f6852c7d..3764b886d00 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMultiJoinOperator.java @@ -777,12 +777,15 @@ public class StreamingMultiJoinOperator extends AbstractStreamOperatorV2<RowData } private void addRecordToState(RowData input, int inputId) throws Exception { + final boolean isUpsert = isUpsert(input); RowData joinKey = keyExtractor.getJoinKey(input, inputId); - if (isRetraction(input)) { - stateHandlers.get(inputId).retractRecord(joinKey, input); - } else { + // Always use insert so we store and retract records correctly from state + input.setRowKind(RowKind.INSERT); + if (isUpsert) { stateHandlers.get(inputId).addRecord(joinKey, input); + } else { + stateHandlers.get(inputId).retractRecord(joinKey, input); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java index a0a59922c51..6d3caba2ba3 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/MultiJoinStateViews.java @@ -32,7 +32,6 @@ import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideS import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.RowKind; import org.apache.flink.util.IterableIterator; import javax.annotation.Nonnull; @@ -238,15 +237,15 @@ public final class MultiJoinStateViews { @Override public void addRecord(@Nullable RowData joinKey, RowData record) throws Exception { - RowData uniqueKey = uniqueKeySelector.getKey(record); - RowData stateKey = getStateKey(joinKey, uniqueKey); + final RowData uniqueKey = uniqueKeySelector.getKey(record); + final RowData stateKey = getStateKey(joinKey, uniqueKey); recordState.put(stateKey, record); } @Override public void retractRecord(@Nullable RowData joinKey, RowData record) throws Exception { - RowData uniqueKey = uniqueKeySelector.getKey(record); - RowData stateKey = getStateKey(joinKey, uniqueKey); + final RowData uniqueKey = uniqueKeySelector.getKey(record); + final RowData stateKey = getStateKey(joinKey, uniqueKey); recordState.remove(stateKey); } @@ -364,25 +363,18 @@ public final class MultiJoinStateViews { @Override public void addRecord(@Nullable RowData joinKey, RowData record) throws Exception { - // Normalize RowKind for consistent state representation - RowKind originalKind = record.getRowKind(); - record.setRowKind(RowKind.INSERT); // Normalize for key creation - RowData stateKey = getStateKey(joinKey, record); + final RowData stateKey = getStateKey(joinKey, record); Integer currentCount = recordState.get(stateKey); if (currentCount == null) { currentCount = 0; } recordState.put(stateKey, currentCount + 1); - record.setRowKind(originalKind); // Restore original RowKind } @Override public void retractRecord(@Nullable RowData joinKey, RowData record) throws Exception { - // Normalize RowKind for consistent state representation and lookup - RowKind originalKind = record.getRowKind(); - record.setRowKind(RowKind.INSERT); // Normalize for key lookup - RowData stateKey = getStateKey(joinKey, record); + final RowData stateKey = getStateKey(joinKey, record); Integer currentCount = recordState.get(stateKey); if (currentCount != null) { @@ -392,7 +384,6 @@ public final class MultiJoinStateViews { recordState.remove(stateKey); } } - record.setRowKind(originalKind); // Restore original RowKind } @Override