This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 788831b3f99 [FLINK-38217][table] ChangelogNormalize unnecessarily 
emits updates for equal rows
788831b3f99 is described below

commit 788831b3f99c1328daff2eabb7e49ac019a4e924
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Fri Aug 15 12:22:21 2025 +0200

    [FLINK-38217][table] ChangelogNormalize unnecessarily emits updates for 
equal rows
---
 .../flink/table/test/program/TableTestStep.java    |  29 ++
 .../exec/stream/StreamExecChangelogNormalize.java  |   8 +-
 .../ChangelogNormalizeSemanticTestPrograms.java    | 352 +++++++++++++++++++++
 .../stream/ChangelogNormalizeSemanticTests.java    |  42 +++
 .../runtime/stream/sql/DataStreamJavaITCase.java   |   4 +
 .../utils/DeduplicateFunctionHelper.java           |  22 +-
 6 files changed, 453 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
index d63d11b0821..b6313bb7e44 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java
@@ -23,12 +23,15 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +41,18 @@ import java.util.stream.Collectors;
 /** Abstract class for {@link SourceTestStep} and {@link SinkTestStep}. */
 public abstract class TableTestStep implements TestStep {
 
+    private static final Map<RowKind, String> ROWKIND_TO_STRING_MAP =
+            new EnumMap<>(
+                    Map.of(
+                            RowKind.INSERT,
+                            "I",
+                            RowKind.UPDATE_BEFORE,
+                            "UB",
+                            RowKind.UPDATE_AFTER,
+                            "UA",
+                            RowKind.DELETE,
+                            "D"));
+
     public final String name;
     public final List<String> schemaComponents;
     public final @Nullable TableDistribution distribution;
@@ -151,5 +166,19 @@ public abstract class TableTestStep implements TestStep {
             this.partitionKeys.addAll(Arrays.asList(partitionKeys));
             return (SpecificBuilder) this;
         }
+
+        public SpecificBuilder addMode(ChangelogMode mode) {
+            this.options.put("connector", "values");
+            final String changelogsStr =
+                    mode.getContainedKinds().stream()
+                            .map(ROWKIND_TO_STRING_MAP::get)
+                            .collect(Collectors.joining(","));
+            this.options.put("changelog-mode", changelogsStr);
+            this.options.put("sink-changelog-mode-enforced", changelogsStr);
+            final String keyOnlyDeleteStr = 
Boolean.valueOf(mode.keyOnlyDeletes()).toString();
+            this.options.put("source.produces-delete-by-key", 
keyOnlyDeleteStr);
+            this.options.put("sink.supports-delete-by-key", keyOnlyDeleteStr);
+            return (SpecificBuilder) this;
+        }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
index 627b6ebad0d..4a26def549b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java
@@ -166,9 +166,13 @@ public class StreamExecChangelogNormalize extends 
ExecNodeBase<RowData>
                 new EqualiserCodeGenerator(inputType, classLoader)
                         .generateRecordEqualiser("DeduplicateRowEqualiser");
 
+        // Depending on whether filterCondition is null or not, there is 
different logic
+        // at ProcTimeMiniBatchDeduplicateKeepLastRowFunction#finishBundle
         final GeneratedFilterCondition generatedFilterCondition =
-                FilterCodeGenerator.generateFilterCondition(
-                        config, classLoader, filterCondition, inputType);
+                filterCondition == null
+                        ? null
+                        : FilterCodeGenerator.generateFilterCondition(
+                                config, classLoader, filterCondition, 
inputType);
 
         if (isMiniBatchEnabled) {
             TypeSerializer<RowData> rowSerializer =
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java
new file mode 100644
index 00000000000..6160a5df22c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * {@link TableTestProgram} definitions for semantic testing {@link 
StreamExecChangelogNormalize}.
+ */
+public class ChangelogNormalizeSemanticTestPrograms {
+
+    private static final String[] SINK_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+
+    private static final String[] SOURCE_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+
+    private static final ChangelogMode KAFKA_WITH_PARTIAL_DELETES_CHANGELOG =
+            ChangelogMode.newBuilder()
+                    .addContainedKind(RowKind.UPDATE_AFTER)
+                    .addContainedKind(RowKind.DELETE)
+                    .keyOnlyDeletes(true)
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_NON_KEY_FILTER =
+            TableTestProgram.of(
+                            "upsert-with-non-key-filter", "validates upsert 
with non key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addMode(ChangelogMode.upsert())
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 0, "dd"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "dd"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 5, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-D[one, 2, bb]",
+                                            "+I[two, 0, dd]",
+                                            "-U[two, 0, dd]",
+                                            "+U[two, 1, dd]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 3")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_KEY_FILTER =
+            TableTestProgram.of("upsert-with-key-filter", "validates upsert 
with key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addMode(ChangelogMode.upsert())
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.INSERT, "two", 
1, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "d"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.DELETE, "two", 
1, "d"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[two, 1, d]",
+                                            "-D[two, 1, d]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE a <> 'three'")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_NO_FILTER =
+            TableTestProgram.of("upsert-with-no-filter", "validates upsert 
with no filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addMode(ChangelogMode.upsert())
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.INSERT, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "cc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[three, 3, ccc]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+
+    static final TableTestProgram KAFKA_SOURCE_WITH_NON_KEY_FILTER =
+            TableTestProgram.of(
+                            "kafka-with-non-key-filter",
+                            "validates kafka source (with partial deletes) 
with non key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    
.addMode(KAFKA_WITH_PARTIAL_DELETES_CHANGELOG)
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 0, "dd"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "dd"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", null, null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-D[one, 2, bb]",
+                                            "+I[two, 0, dd]",
+                                            "-U[two, 0, dd]",
+                                            "+U[two, 1, dd]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 3")
+                    .build();
+
+    static final TableTestProgram KAFKA_SOURCE_WITH_KEY_FILTER =
+            TableTestProgram.of(
+                            "kafka-with-key-filter",
+                            "validates kafka source (with partial deletes) 
with key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    
.addMode(KAFKA_WITH_PARTIAL_DELETES_CHANGELOG)
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.INSERT, "two", 
1, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "d"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", null, null),
+                                            Row.ofKind(RowKind.DELETE, "two", 
null, null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[two, 1, d]",
+                                            "-D[two, 1, d]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE a <> 'three'")
+                    .build();
+
+    static final TableTestProgram KAFKA_SOURCE_WITH_NO_FILTER =
+            TableTestProgram.of(
+                            "kafka-with-no-filter",
+                            "validates kafka source (with partial deletes) 
with no filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    
.addMode(KAFKA_WITH_PARTIAL_DELETES_CHANGELOG)
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.INSERT, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", null, null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[three, 3, ccc]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+
+    static final TableTestProgram RETRACT_SOURCE_WITH_NON_KEY_FILTER =
+            TableTestProgram.of(
+                            "retract-source-with-non-key-filter",
+                            "validates retract source with non filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .addMode(ChangelogMode.all())
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "+U[three, 3, ccc]",
+                                            "+U[three, 3, ccc]",
+                                            "+I[one, 4, aaaa]",
+                                            "+U[one, 4, aaaa]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 10")
+                    .build();
+
+    static final TableTestProgram RETRACT_SOURCE_WITH_KEY_FILTER =
+            TableTestProgram.of(
+                            "retract-source-with-key-filter",
+                            "validates retract source with key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.INSERT, "two", 
1, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "d"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "cc"),
+                                            Row.ofKind(RowKind.DELETE, "two", 
1, "d"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .addMode(ChangelogMode.all())
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "+I[one, 4, aaaa]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[two, 1, d]",
+                                            "+U[two, 1, d]",
+                                            "-D[two, 1, d]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE a <> 'three'")
+                    .build();
+
+    static final TableTestProgram RETRACT_SOURCE_NO_FILTER =
+            TableTestProgram.of(
+                            "retract-source-with-no-filter",
+                            "validates retract source with no filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addMode(ChangelogMode.all())
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "+U[three, 3, ccc]",
+                                            "+I[one, 4, aaaa]",
+                                            "+U[one, 4, aaaa]",
+                                            "+U[three, 3, ccc]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTests.java
new file mode 100644
index 00000000000..eb856e56e50
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTests.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/** Semantic tests for {@link StreamExecChangelogNormalize}. */
+public class ChangelogNormalizeSemanticTests extends SemanticTestBase {
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return List.of(
+                
ChangelogNormalizeSemanticTestPrograms.UPSERT_SOURCE_WITH_NON_KEY_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.UPSERT_SOURCE_WITH_KEY_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.UPSERT_SOURCE_WITH_NO_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.KAFKA_SOURCE_WITH_NON_KEY_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.KAFKA_SOURCE_WITH_KEY_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.KAFKA_SOURCE_WITH_NO_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.RETRACT_SOURCE_NO_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.RETRACT_SOURCE_WITH_NON_KEY_FILTER,
+                
ChangelogNormalizeSemanticTestPrograms.RETRACT_SOURCE_WITH_KEY_FILTER);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index 2d95f55112c..9f09db2225c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -531,6 +531,8 @@ class DataStreamJavaITCase {
                         output(RowKind.INSERT, "alice", 1),
                         // --
                         input(RowKind.INSERT, "alice", 1), // no impact
+                        input(RowKind.UPDATE_AFTER, "alice", 1), // no impact
+                        input(RowKind.UPDATE_AFTER, "alice", 1), // no impact
                         // --
                         input(RowKind.UPDATE_AFTER, "alice", 2),
                         output(RowKind.UPDATE_BEFORE, "alice", 1),
@@ -569,6 +571,8 @@ class DataStreamJavaITCase {
                         output(RowKind.INSERT, "alice", 1),
                         // --
                         input(RowKind.INSERT, "alice", 1), // no impact
+                        input(RowKind.UPDATE_AFTER, "alice", 1), // no impact
+                        input(RowKind.UPDATE_AFTER, "alice", 1), // no impact
                         // --
                         input(RowKind.UPDATE_AFTER, "alice", 2),
                         output(RowKind.UPDATE_AFTER, "alice", 2),
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java
index a9b79fc9f7b..95bed441ed0 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java
@@ -117,7 +117,7 @@ public class DeduplicateFunctionHelper {
                 currentRow.setRowKind(RowKind.INSERT);
                 out.collect(currentRow);
             } else {
-                if (!isStateTtlEnabled && equaliser.equals(preRow, 
currentRow)) {
+                if (!isStateTtlEnabled && areRowsWithSameContent(equaliser, 
preRow, currentRow)) {
                     // currentRow is the same as preRow and state cleaning is 
not enabled.
                     // We do not emit retraction and update message.
                     // If state cleaning is enabled, we have to emit messages 
to prevent too early
@@ -176,7 +176,7 @@ public class DeduplicateFunctionHelper {
                     return;
                 }
             } else {
-                if (!isStateTtlEnabled && equaliser.equals(preRow, 
currentRow)) {
+                if (!isStateTtlEnabled && areRowsWithSameContent(equaliser, 
preRow, currentRow)) {
                     // currentRow is the same as preRow and state cleaning is 
not enabled.
                     // We do not emit retraction and update message.
                     // If state cleaning is enabled, we have to emit messages 
to prevent too early
@@ -292,6 +292,24 @@ public class DeduplicateFunctionHelper {
         }
     }
 
+    /**
+     * Important: the method assumes that {@code currentRow} comes either with 
{@code
+     * RowKind.UPDATE_AFTER} or with {@code RowKind.INSERT}. It is not 
designed to be used for other
+     * cases.
+     */
+    private static boolean areRowsWithSameContent(
+            RecordEqualiser equaliser, RowData prevRow, RowData currentRow) {
+        final RowKind currentRowKind = currentRow.getRowKind();
+        if (currentRowKind == RowKind.UPDATE_AFTER) {
+            // setting row kind to prevRowKind to check whether the row 
content is the same
+            currentRow.setRowKind(RowKind.INSERT);
+            final boolean result = equaliser.equals(prevRow, currentRow);
+            currentRow.setRowKind(currentRowKind);
+            return result;
+        }
+        return equaliser.equals(prevRow, currentRow);
+    }
+
     private static long getRowtime(RowData input, int rowtimeIndex) {
         return input.getLong(rowtimeIndex);
     }


Reply via email to