This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 788831b3f99 [FLINK-38217][table] ChangelogNormalize unnecessarily emits updates for equal rows 788831b3f99 is described below commit 788831b3f99c1328daff2eabb7e49ac019a4e924 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Fri Aug 15 12:22:21 2025 +0200 [FLINK-38217][table] ChangelogNormalize unnecessarily emits updates for equal rows --- .../flink/table/test/program/TableTestStep.java | 29 ++ .../exec/stream/StreamExecChangelogNormalize.java | 8 +- .../ChangelogNormalizeSemanticTestPrograms.java | 352 +++++++++++++++++++++ .../stream/ChangelogNormalizeSemanticTests.java | 42 +++ .../runtime/stream/sql/DataStreamJavaITCase.java | 4 + .../utils/DeduplicateFunctionHelper.java | 22 +- 6 files changed, 453 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java index d63d11b0821..b6313bb7e44 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java @@ -23,12 +23,15 @@ import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.types.RowKind; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,6 +41,18 @@ import java.util.stream.Collectors; /** Abstract class for {@link SourceTestStep} and {@link SinkTestStep}. */ public abstract class TableTestStep implements TestStep { + private static final Map<RowKind, String> ROWKIND_TO_STRING_MAP = + new EnumMap<>( + Map.of( + RowKind.INSERT, + "I", + RowKind.UPDATE_BEFORE, + "UB", + RowKind.UPDATE_AFTER, + "UA", + RowKind.DELETE, + "D")); + public final String name; public final List<String> schemaComponents; public final @Nullable TableDistribution distribution; @@ -151,5 +166,19 @@ public abstract class TableTestStep implements TestStep { this.partitionKeys.addAll(Arrays.asList(partitionKeys)); return (SpecificBuilder) this; } + + public SpecificBuilder addMode(ChangelogMode mode) { + this.options.put("connector", "values"); + final String changelogsStr = + mode.getContainedKinds().stream() + .map(ROWKIND_TO_STRING_MAP::get) + .collect(Collectors.joining(",")); + this.options.put("changelog-mode", changelogsStr); + this.options.put("sink-changelog-mode-enforced", changelogsStr); + final String keyOnlyDeleteStr = Boolean.valueOf(mode.keyOnlyDeletes()).toString(); + this.options.put("source.produces-delete-by-key", keyOnlyDeleteStr); + this.options.put("sink.supports-delete-by-key", keyOnlyDeleteStr); + return (SpecificBuilder) this; + } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java index 627b6ebad0d..4a26def549b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecChangelogNormalize.java @@ -166,9 +166,13 @@ public class StreamExecChangelogNormalize extends ExecNodeBase<RowData> new EqualiserCodeGenerator(inputType, classLoader) .generateRecordEqualiser("DeduplicateRowEqualiser"); + // Depending on whether filterCondition is null or not, there is different logic + // at ProcTimeMiniBatchDeduplicateKeepLastRowFunction#finishBundle final GeneratedFilterCondition generatedFilterCondition = - FilterCodeGenerator.generateFilterCondition( - config, classLoader, filterCondition, inputType); + filterCondition == null + ? null + : FilterCodeGenerator.generateFilterCondition( + config, classLoader, filterCondition, inputType); if (isMiniBatchEnabled) { TypeSerializer<RowData> rowSerializer = diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java new file mode 100644 index 00000000000..6160a5df22c --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** + * {@link TableTestProgram} definitions for semantic testing {@link StreamExecChangelogNormalize}. + */ +public class ChangelogNormalizeSemanticTestPrograms { + + private static final String[] SINK_SCHEMA = { + "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED" + }; + + private static final String[] SOURCE_SCHEMA = { + "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED" + }; + + private static final ChangelogMode KAFKA_WITH_PARTIAL_DELETES_CHANGELOG = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .keyOnlyDeletes(true) + .build(); + + static final TableTestProgram UPSERT_SOURCE_WITH_NON_KEY_FILTER = + TableTestProgram.of( + "upsert-with-non-key-filter", "validates upsert with non key filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(ChangelogMode.upsert()) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 0, "dd"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 1, "dd"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.DELETE, "three", 5, "ccc")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addMode(ChangelogMode.all()) + .addSchema(SINK_SCHEMA) + .consumedValues( + "+I[one, 1, a]", + "-U[one, 1, a]", + "+U[one, 2, bb]", + "-D[one, 2, bb]", + "+I[two, 0, dd]", + "-U[two, 0, dd]", + "+U[two, 1, dd]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t WHERE b < 3") + .build(); + + static final TableTestProgram UPSERT_SOURCE_WITH_KEY_FILTER = + TableTestProgram.of("upsert-with-key-filter", "validates upsert with key filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(ChangelogMode.upsert()) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.INSERT, "two", 1, "d"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 1, "d"), + Row.ofKind(RowKind.DELETE, "three", 3, "ccc"), + Row.ofKind(RowKind.DELETE, "two", 1, "d")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addMode(ChangelogMode.all()) + .addSchema(SINK_SCHEMA) + .consumedValues( + "+I[one, 1, a]", + "-U[one, 1, a]", + "+U[one, 2, bb]", + "-U[one, 2, bb]", + "+U[one, 4, aaaa]", + "+I[two, 1, d]", + "-D[two, 1, d]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t WHERE a <> 'three'") + .build(); + + static final TableTestProgram UPSERT_SOURCE_WITH_NO_FILTER = + TableTestProgram.of("upsert-with-no-filter", "validates upsert with no filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(ChangelogMode.upsert()) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.INSERT, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.DELETE, "three", 3, "cc")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addMode(ChangelogMode.all()) + .addSchema(SINK_SCHEMA) + .consumedValues( + "+I[one, 1, a]", + "-U[one, 1, a]", + "+U[one, 2, bb]", + "-U[one, 2, bb]", + "+U[one, 4, aaaa]", + "+I[three, 3, ccc]", + "-D[three, 3, ccc]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t") + .build(); + + static final TableTestProgram KAFKA_SOURCE_WITH_NON_KEY_FILTER = + TableTestProgram.of( + "kafka-with-non-key-filter", + "validates kafka source (with partial deletes) with non key filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(KAFKA_WITH_PARTIAL_DELETES_CHANGELOG) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 0, "dd"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 1, "dd"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.DELETE, "three", null, null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addMode(ChangelogMode.all()) + .addSchema(SINK_SCHEMA) + .consumedValues( + "+I[one, 1, a]", + "-U[one, 1, a]", + "+U[one, 2, bb]", + "-D[one, 2, bb]", + "+I[two, 0, dd]", + "-U[two, 0, dd]", + "+U[two, 1, dd]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t WHERE b < 3") + .build(); + + static final TableTestProgram KAFKA_SOURCE_WITH_KEY_FILTER = + TableTestProgram.of( + "kafka-with-key-filter", + "validates kafka source (with partial deletes) with key filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(KAFKA_WITH_PARTIAL_DELETES_CHANGELOG) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.INSERT, "two", 1, "d"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 1, "d"), + Row.ofKind(RowKind.DELETE, "three", null, null), + Row.ofKind(RowKind.DELETE, "two", null, null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addMode(ChangelogMode.all()) + .addSchema(SINK_SCHEMA) + .consumedValues( + "+I[one, 1, a]", + "-U[one, 1, a]", + "+U[one, 2, bb]", + "-U[one, 2, bb]", + "+U[one, 4, aaaa]", + "+I[two, 1, d]", + "-D[two, 1, d]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t WHERE a <> 'three'") + .build(); + + static final TableTestProgram KAFKA_SOURCE_WITH_NO_FILTER = + TableTestProgram.of( + "kafka-with-no-filter", + "validates kafka source (with partial deletes) with no filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(KAFKA_WITH_PARTIAL_DELETES_CHANGELOG) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.INSERT, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.DELETE, "three", null, null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addMode(ChangelogMode.all()) + .addSchema(SINK_SCHEMA) + .consumedValues( + "+I[one, 1, a]", + "-U[one, 1, a]", + "+U[one, 2, bb]", + "-U[one, 2, bb]", + "+U[one, 4, aaaa]", + "+I[three, 3, ccc]", + "-D[three, 3, ccc]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t") + .build(); + + static final TableTestProgram RETRACT_SOURCE_WITH_NON_KEY_FILTER = + TableTestProgram.of( + "retract-source-with-non-key-filter", + "validates retract source with non filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(ChangelogMode.all()) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.DELETE, "three", 3, "ccc")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .addMode(ChangelogMode.all()) + .consumedValues( + "+I[one, 1, a]", + "+U[one, 2, bb]", + "+U[three, 3, ccc]", + "+U[three, 3, ccc]", + "+I[one, 4, aaaa]", + "+U[one, 4, aaaa]", + "-D[three, 3, ccc]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t WHERE b < 10") + .build(); + + static final TableTestProgram RETRACT_SOURCE_WITH_KEY_FILTER = + TableTestProgram.of( + "retract-source-with-key-filter", + "validates retract source with key filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(ChangelogMode.all()) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.INSERT, "two", 1, "d"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 1, "d"), + Row.ofKind(RowKind.DELETE, "three", 3, "cc"), + Row.ofKind(RowKind.DELETE, "two", 1, "d")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .addMode(ChangelogMode.all()) + .consumedValues( + "+I[one, 1, a]", + "+U[one, 2, bb]", + "+I[one, 4, aaaa]", + "+U[one, 4, aaaa]", + "+I[two, 1, d]", + "+U[two, 1, d]", + "-D[two, 1, d]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t WHERE a <> 'three'") + .build(); + + static final TableTestProgram RETRACT_SOURCE_NO_FILTER = + TableTestProgram.of( + "retract-source-with-no-filter", + "validates retract source with no filter") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addMode(ChangelogMode.all()) + .addSchema(SOURCE_SCHEMA) + .producedValues( + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 2, "bb"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "ccc"), + Row.ofKind(RowKind.INSERT, "one", 4, "aaaa"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 4, "aaaa"), + Row.ofKind(RowKind.DELETE, "three", 3, "ccc")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addMode(ChangelogMode.all()) + .addSchema(SINK_SCHEMA) + .consumedValues( + "+I[one, 1, a]", + "+U[one, 2, bb]", + "+U[three, 3, ccc]", + "+I[one, 4, aaaa]", + "+U[one, 4, aaaa]", + "+U[three, 3, ccc]", + "-D[three, 3, ccc]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTests.java new file mode 100644 index 00000000000..eb856e56e50 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTests.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.List; + +/** Semantic tests for {@link StreamExecChangelogNormalize}. */ +public class ChangelogNormalizeSemanticTests extends SemanticTestBase { + + @Override + public List<TableTestProgram> programs() { + return List.of( + ChangelogNormalizeSemanticTestPrograms.UPSERT_SOURCE_WITH_NON_KEY_FILTER, + ChangelogNormalizeSemanticTestPrograms.UPSERT_SOURCE_WITH_KEY_FILTER, + ChangelogNormalizeSemanticTestPrograms.UPSERT_SOURCE_WITH_NO_FILTER, + ChangelogNormalizeSemanticTestPrograms.KAFKA_SOURCE_WITH_NON_KEY_FILTER, + ChangelogNormalizeSemanticTestPrograms.KAFKA_SOURCE_WITH_KEY_FILTER, + ChangelogNormalizeSemanticTestPrograms.KAFKA_SOURCE_WITH_NO_FILTER, + ChangelogNormalizeSemanticTestPrograms.RETRACT_SOURCE_NO_FILTER, + ChangelogNormalizeSemanticTestPrograms.RETRACT_SOURCE_WITH_NON_KEY_FILTER, + ChangelogNormalizeSemanticTestPrograms.RETRACT_SOURCE_WITH_KEY_FILTER); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index 2d95f55112c..9f09db2225c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -531,6 +531,8 @@ class DataStreamJavaITCase { output(RowKind.INSERT, "alice", 1), // -- input(RowKind.INSERT, "alice", 1), // no impact + input(RowKind.UPDATE_AFTER, "alice", 1), // no impact + input(RowKind.UPDATE_AFTER, "alice", 1), // no impact // -- input(RowKind.UPDATE_AFTER, "alice", 2), output(RowKind.UPDATE_BEFORE, "alice", 1), @@ -569,6 +571,8 @@ class DataStreamJavaITCase { output(RowKind.INSERT, "alice", 1), // -- input(RowKind.INSERT, "alice", 1), // no impact + input(RowKind.UPDATE_AFTER, "alice", 1), // no impact + input(RowKind.UPDATE_AFTER, "alice", 1), // no impact // -- input(RowKind.UPDATE_AFTER, "alice", 2), output(RowKind.UPDATE_AFTER, "alice", 2), diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java index a9b79fc9f7b..95bed441ed0 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java @@ -117,7 +117,7 @@ public class DeduplicateFunctionHelper { currentRow.setRowKind(RowKind.INSERT); out.collect(currentRow); } else { - if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) { + if (!isStateTtlEnabled && areRowsWithSameContent(equaliser, preRow, currentRow)) { // currentRow is the same as preRow and state cleaning is not enabled. // We do not emit retraction and update message. // If state cleaning is enabled, we have to emit messages to prevent too early @@ -176,7 +176,7 @@ public class DeduplicateFunctionHelper { return; } } else { - if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) { + if (!isStateTtlEnabled && areRowsWithSameContent(equaliser, preRow, currentRow)) { // currentRow is the same as preRow and state cleaning is not enabled. // We do not emit retraction and update message. // If state cleaning is enabled, we have to emit messages to prevent too early @@ -292,6 +292,24 @@ public class DeduplicateFunctionHelper { } } + /** + * Important: the method assumes that {@code currentRow} comes either with {@code + * RowKind.UPDATE_AFTER} or with {@code RowKind.INSERT}. It is not designed to be used for other + * cases. + */ + private static boolean areRowsWithSameContent( + RecordEqualiser equaliser, RowData prevRow, RowData currentRow) { + final RowKind currentRowKind = currentRow.getRowKind(); + if (currentRowKind == RowKind.UPDATE_AFTER) { + // setting row kind to prevRowKind to check whether the row content is the same + currentRow.setRowKind(RowKind.INSERT); + final boolean result = equaliser.equals(prevRow, currentRow); + currentRow.setRowKind(currentRowKind); + return result; + } + return equaliser.equals(prevRow, currentRow); + } + private static long getRowtime(RowData input, int rowtimeIndex) { return input.getLong(rowtimeIndex); }