This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 743b9bd2b32 [FLINK-38928] Implement an operator for handling DO
ERROR/NOTHING (#27502)
743b9bd2b32 is described below
commit 743b9bd2b32149559983eb983c9ec0391a02fb9e
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 12 09:30:17 2026 +0100
[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#27502)
---
.../plan/nodes/exec/common/CommonExecSink.java | 7 +-
.../plan/nodes/exec/stream/StreamExecSink.java | 76 +-
.../nodes/physical/stream/StreamPhysicalSink.scala | 35 +
.../FlinkChangelogModeInferenceProgram.scala | 38 +-
.../plan/nodes/exec/stream/SinkSemanticTests.java | 31 +-
.../plan/nodes/exec/stream/SinkTestPrograms.java | 35 +-
.../operators/sink/SortedLongSerializer.java | 115 +++
.../sink/WatermarkCompactingSinkMaterializer.java | 504 ++++++++++++++
.../operators/sink/WatermarkTimestampAssigner.java | 49 ++
.../operators/sink/SortedLongSerializerTest.java | 51 ++
.../WatermarkCompactingSinkMaterializerTest.java | 769 +++++++++++++++++++++
.../table/runtime/util/StreamRecordUtils.java | 20 +
.../TypeSerializerTestCoverageTest.java | 4 +-
13 files changed, 1668 insertions(+), 66 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 0fc273ca684..31c0bc4074f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -187,9 +187,6 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
Optional<LineageVertex> lineageVertexOpt =
TableLineageUtils.extractLineageDataset(outputObject);
- // only add materialization if input has change
- final boolean needMaterialization = !inputInsertOnly &&
upsertMaterialize;
-
Transformation<RowData> sinkTransform =
applyConstraintValidations(inputTransform, config,
persistedRowType);
@@ -202,10 +199,10 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
primaryKeys,
sinkParallelism,
inputParallelism,
- needMaterialization);
+ upsertMaterialize);
}
- if (needMaterialization) {
+ if (upsertMaterialize) {
sinkTransform =
applyUpsertMaterialize(
sinkTransform,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 1726105dee8..ce95c8b65bf 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import
org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter;
import
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
@@ -55,6 +55,8 @@ import
org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2;
+import
org.apache.flink.table.runtime.operators.sink.WatermarkCompactingSinkMaterializer;
+import
org.apache.flink.table.runtime.operators.sink.WatermarkTimestampAssigner;
import
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
@@ -100,6 +102,7 @@ import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
+ StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
@@ -124,6 +127,7 @@ import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
+ StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
@@ -133,6 +137,9 @@ import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
public class StreamExecSink extends CommonExecSink implements
StreamExecNode<Object> {
private static final Logger LOG =
LoggerFactory.getLogger(StreamExecSink.class);
+ public static final String WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION =
+ "watermark-timestamp-assigner";
+
public static final String FIELD_NAME_INPUT_CHANGELOG_MODE =
"inputChangelogMode";
public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE =
"requireUpsertMaterialize";
public static final String FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY =
"upsertMaterializeStrategy";
@@ -237,17 +244,6 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
@Override
protected Transformation<Object> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
- // TODO: FLINK-38928 Remove this validation once runtime support for
ERROR and NOTHING is
- // implemented
- if (InsertConflictStrategy.error().equals(conflictStrategy)
- || InsertConflictStrategy.nothing().equals(conflictStrategy)) {
- throw new ValidationException(
- "ON CONFLICT DO "
- + conflictStrategy
- + " is not yet supported. "
- + "Please use ON CONFLICT DO DEDUPLICATE
instead.");
- }
-
final ExecEdge inputEdge = getInputEdges().get(0);
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -341,10 +337,17 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
StateConfigUtil.createTtlConfig(
StateMetadata.getStateTtlForOneInputOperator(config,
stateMetadataList));
+ final String[] pkFieldNames =
+ Arrays.stream(primaryKeys)
+ .mapToObj(idx ->
physicalRowType.getFieldNames().get(idx))
+ .toArray(String[]::new);
+
final OneInputStreamOperator<RowData, RowData> operator =
createSumOperator(
config,
physicalRowType,
+ primaryKeys,
+ pkFieldNames,
inputUpsertKey,
upsertKeyEqualiser,
upsertKeyHashFunction,
@@ -352,15 +355,29 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
rowEqualiser,
rowHashFunction);
- final String[] fieldNames =
physicalRowType.getFieldNames().toArray(new String[0]);
- final List<String> pkFieldNames =
- Arrays.stream(primaryKeys)
- .mapToObj(idx -> fieldNames[idx])
- .collect(Collectors.toList());
+ // For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first
+ // This assigns the current watermark as the timestamp to each record,
+ // which is required for the WatermarkCompactingSinkMaterializer to
work correctly
+ Transformation<RowData> transformForMaterializer = inputTransform;
+ if (isErrorOrNothingConflictStrategy()) {
+ // Use input parallelism to preserve watermark semantics
+ transformForMaterializer =
+ ExecNodeUtil.createOneInputTransformation(
+ inputTransform,
+ createTransformationMeta(
+
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
+ "WatermarkTimestampAssigner",
+ "WatermarkTimestampAssigner",
+ config),
+ new WatermarkTimestampAssigner(),
+ inputTransform.getOutputType(),
+ inputTransform.getParallelism(),
+ false);
+ }
OneInputTransformation<RowData, RowData> materializeTransform =
ExecNodeUtil.createOneInputTransformation(
- inputTransform,
+ transformForMaterializer,
createTransformationMeta(
UPSERT_MATERIALIZE_TRANSFORMATION,
String.format(
@@ -383,6 +400,8 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
private OneInputStreamOperator<RowData, RowData> createSumOperator(
ExecNodeConfig config,
RowType physicalRowType,
+ int[] primaryKeys,
+ String[] pkFieldNames,
int[] inputUpsertKey,
GeneratedRecordEqualiser upsertKeyEqualiser,
GeneratedHashFunction upsertKeyHashFunction,
@@ -390,6 +409,21 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
GeneratedRecordEqualiser rowEqualiser,
GeneratedHashFunction rowHashFunction) {
+ // Check if we should use the watermark-compacting materializer for
ERROR/NOTHING strategies
+ if (isErrorOrNothingConflictStrategy()) {
+ RowType keyType = RowTypeUtils.projectRowType(physicalRowType,
primaryKeys);
+
+ return WatermarkCompactingSinkMaterializer.create(
+ conflictStrategy,
+ physicalRowType,
+ rowEqualiser,
+ upsertKeyEqualiser,
+ inputUpsertKey,
+ keyType,
+ pkFieldNames);
+ }
+
+ // Use existing logic for DEDUPLICATE (legacy behavior)
SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
Optional.ofNullable(upsertMaterializeStrategy)
.orElse(SinkUpsertMaterializeStrategy.LEGACY);
@@ -415,6 +449,12 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
config));
}
+ private boolean isErrorOrNothingConflictStrategy() {
+ return conflictStrategy != null
+ && (conflictStrategy.getBehavior() == ConflictBehavior.ERROR
+ || conflictStrategy.getBehavior() ==
ConflictBehavior.NOTHING);
+ }
+
private static SequencedMultiSetStateConfig createStateConfig(
SinkUpsertMaterializeStrategy strategy,
TimeDomain ttlTimeDomain,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index c77d8312d52..cf2a7d53c9e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.table.api.InsertConflictStrategy
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
import
org.apache.flink.table.api.config.ExecutionConfigOptions.{SinkUpsertMaterializeStrategy,
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY}
import org.apache.flink.table.catalog.ContextResolvedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
@@ -34,9 +35,12 @@ import
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.hint.RelHint
+import org.apache.calcite.util.ImmutableBitSet
import java.util
+import scala.collection.JavaConversions._
+
/**
* Stream physical RelNode to write data into an external sink defined by a
[[DynamicTableSink]].
*/
@@ -134,4 +138,35 @@ class StreamPhysicalSink(
.itemIf("upsertMaterialize", "true", upsertMaterialize)
.itemIf("conflictStrategy", conflictStrategy, conflictStrategy != null)
}
+
+ def isDeduplicateConflictStrategy: Boolean = {
+ conflictStrategy != null && conflictStrategy.getBehavior ==
ConflictBehavior.DEDUPLICATE
+ }
+
+ def primaryKeysContainsUpsertKey: Boolean = {
+ val primaryKeys =
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
+ val pks = ImmutableBitSet.of(primaryKeys: _*)
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
+ changeLogUpsertKeys != null && changeLogUpsertKeys.exists(pks.contains)
+ }
+
+ def getUpsertKeyNames: String = {
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
+ if (changeLogUpsertKeys == null) {
+ "none"
+ } else {
+ val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
+ changeLogUpsertKeys
+ .map(uk => uk.toArray.map(fieldNames.get).mkString("[", ", ", "]"))
+ .mkString(", ")
+ }
+ }
+
+ def getPrimaryKeyNames: String = {
+ val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
+ val primaryKeys =
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
+ primaryKeys.map(fieldNames.get).mkString("[", ", ", "]")
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index baa9b25db02..9cc84db2b7b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -1060,38 +1060,30 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
case UpsertMaterialize.NONE => false
case UpsertMaterialize.AUTO =>
- if (
- (inputIsAppend && InsertConflictStrategy
- .deduplicate()
- .equals(sink.conflictStrategy)) || sinkIsAppend || sinkIsRetract
- ) {
+ // if the sink is not an UPSERT sink (has no PK, or is an APPEND or
RETRACT sink)
+ // we don't need to materialize results
+ if (primaryKeys.isEmpty || sinkIsAppend || sinkIsRetract) {
return false
}
- if (primaryKeys.isEmpty) {
+
+ // For a DEDUPLICATE strategy and INSERT only input, we simply let
the inserts be handled
+ // as UPSERT_AFTER and overwrite previous value
+ if (inputIsAppend && sink.isDeduplicateConflictStrategy) {
return false
}
- val pks = ImmutableBitSet.of(primaryKeys: _*)
- val fmq =
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
- val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
- // if input has updates and primary key != upsert key (upsert key
can be null) we should
- // enable upsertMaterialize. An optimize is: do not enable
upsertMaterialize when sink
- // pk(s) contains input changeLogUpsertKeys
- val upsertKeyDiffersFromPk =
- changeLogUpsertKeys == null ||
!changeLogUpsertKeys.exists(pks.contains)
+
+ // if input has updates and primary key != upsert key we should
enable upsertMaterialize.
+ //
+ // An optimize is: do not enable upsertMaterialize when sink pk(s)
contains input
+ // changeLogUpsertKeys
+ val upsertKeyDiffersFromPk = !sink.primaryKeysContainsUpsertKey
// Validate that ON CONFLICT is specified when upsert key differs
from primary key
val requireOnConflict =
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT)
if (requireOnConflict && upsertKeyDiffersFromPk &&
sink.conflictStrategy == null) {
- val fieldNames =
sink.contextResolvedTable.getResolvedSchema.getColumnNames
- val pkNames = primaryKeys.map(fieldNames.get(_)).mkString("[", ",
", "]")
- val upsertKeyNames = if (changeLogUpsertKeys == null) {
- "none"
- } else {
- changeLogUpsertKeys
- .map(uk => uk.toArray.map(fieldNames.get(_)).mkString("[", ",
", "]"))
- .mkString(", ")
- }
+ val pkNames = sink.getPrimaryKeyNames
+ val upsertKeyNames = sink.getUpsertKeyNames
throw new ValidationException(
"The query has an upsert key that differs from the primary key
of the sink table " +
s"'${sink.contextResolvedTable.getIdentifier.asSummaryString}'. " +
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
index 7eaa6c06c7f..6dba8353f02 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
@@ -18,21 +18,48 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.table.api.TableEnvironment;
import
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TestStep;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
/** Semantic tests for {@link StreamExecSink}. */
public class SinkSemanticTests extends SemanticTestBase {
+ @Override
+ public EnumSet<TestKind> supportedSetupSteps() {
+ EnumSet<TestKind> steps = super.supportedSetupSteps();
+ steps.add(TestKind.SINK_WITHOUT_DATA);
+ return steps;
+ }
+
+ @Override
+ protected void runStep(TestStep testStep, TableEnvironment env) throws
Exception {
+ if (testStep.getKind() == TestKind.SINK_WITHOUT_DATA) {
+ final SinkTestStep sinkTestStep = (SinkTestStep) testStep;
+ sinkTestStep.apply(
+ env,
+ Map.ofEntries(
+ Map.entry("connector", "values"),
+ Map.entry("sink-insert-only", "false")));
+ } else {
+ super.runStep(testStep, env);
+ }
+ }
+
@Override
public List<TableTestProgram> programs() {
return List.of(
SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK,
SinkTestPrograms.INSERT_RETRACT_WITH_PK,
- SinkTestPrograms.ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED,
- SinkTestPrograms.ON_CONFLICT_DO_ERROR_NOT_SUPPORTED,
+ SinkTestPrograms.ON_CONFLICT_DO_NOTHING_KEEPS_FIRST,
+ SinkTestPrograms.ON_CONFLICT_DO_ERROR_NO_CONFLICT,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITH_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_MATCHES_PK_WITHOUT_ON_CONFLICT,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
index c59fb88521d..6ddc55cc18b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
@@ -89,46 +89,47 @@ public class SinkTestPrograms {
"INSERT INTO sink_t SELECT UPPER(name), SUM(score)
FROM source_t GROUP BY name")
.build();
- // --- ON CONFLICT validation tests ---
+ // --- ON CONFLICT tests ---
- public static final TableTestProgram ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED =
+ public static final TableTestProgram ON_CONFLICT_DO_NOTHING_KEEPS_FIRST =
TableTestProgram.of(
- "sink-on-conflict-do-nothing-not-supported",
- "ON CONFLICT DO NOTHING is not yet supported and
should throw ValidationException.")
+ "sink-on-conflict-do-nothing-keeps-first",
+ "ON CONFLICT DO NOTHING keeps the first record
when multiple records have the same PK.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
- .producedValues(Row.ofKind(RowKind.INSERT,
1, 1L))
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1, 10L),
+ Row.ofKind(RowKind.INSERT, 1, 20L),
+ Row.ofKind(RowKind.INSERT, 2, 30L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT PRIMARY KEY NOT
ENFORCED", "b BIGINT")
+ .consumedValues("+I[1, 10]", "+I[2, 30]")
.build())
- .runFailingSql(
- "INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO NOTHING",
- ValidationException.class,
- "ON CONFLICT DO NOTHING is not yet supported")
+ .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO NOTHING")
.build();
- public static final TableTestProgram ON_CONFLICT_DO_ERROR_NOT_SUPPORTED =
+ public static final TableTestProgram ON_CONFLICT_DO_ERROR_NO_CONFLICT =
TableTestProgram.of(
- "sink-on-conflict-do-error-not-supported",
- "ON CONFLICT DO ERROR is not yet supported and
should throw ValidationException.")
+ "sink-on-conflict-do-error-no-conflict",
+ "ON CONFLICT DO ERROR with no conflicts passes
through all records.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
- .producedValues(Row.ofKind(RowKind.INSERT,
1, 1L))
+ .producedValues(
+ Row.ofKind(RowKind.INSERT, 1, 10L),
+ Row.ofKind(RowKind.INSERT, 2, 20L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT PRIMARY KEY NOT
ENFORCED", "b BIGINT")
+ .consumedValues("+I[1, 10]", "+I[2, 20]")
.build())
- .runFailingSql(
- "INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO ERROR",
- ValidationException.class,
- "ON CONFLICT DO ERROR is not yet supported")
+ .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO ERROR")
.build();
public static final TableTestProgram
UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT =
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
new file mode 100644
index 00000000000..4a31a1d2da1
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.MathUtils;
+
+import java.io.IOException;
+
+/**
+ * A serializer for {@code Long} that produces a lexicographically sortable
byte representation.
+ *
+ * <p>Standard big-endian long serialization does not maintain numeric
ordering for negative values
+ * in lexicographic byte comparison because negative numbers have their sign
bit set (1), which
+ * makes them appear greater than positive numbers (sign bit 0) in unsigned
byte comparison.
+ *
+ * <p>This serializer flips the sign bit during serialization, converting from
signed to unsigned
+ * ordering. This ensures that when iterating over keys in a sorted state
backend (like RocksDB),
+ * the entries are returned in numeric order.
+ *
+ * @see org.apache.flink.streaming.api.operators.TimerSerializer
+ */
+@Internal
+public final class SortedLongSerializer extends TypeSerializerSingleton<Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ /** Sharable instance of the SortedLongSerializer. */
+ public static final SortedLongSerializer INSTANCE = new
SortedLongSerializer();
+
+ private static final Long ZERO = 0L;
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public Long createInstance() {
+ return ZERO;
+ }
+
+ @Override
+ public Long copy(Long from) {
+ return from;
+ }
+
+ @Override
+ public Long copy(Long from, Long reuse) {
+ return from;
+ }
+
+ @Override
+ public int getLength() {
+ return Long.BYTES;
+ }
+
+ @Override
+ public void serialize(Long record, DataOutputView target) throws
IOException {
+ target.writeLong(MathUtils.flipSignBit(record));
+ }
+
+ @Override
+ public Long deserialize(DataInputView source) throws IOException {
+ return MathUtils.flipSignBit(source.readLong());
+ }
+
+ @Override
+ public Long deserialize(Long reuse, DataInputView source) throws
IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws
IOException {
+ target.writeLong(source.readLong());
+ }
+
+ @Override
+ public TypeSerializerSnapshot<Long> snapshotConfiguration() {
+ return new SortedLongSerializerSnapshot();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Serializer configuration snapshot for compatibility and format
evolution. */
+ @SuppressWarnings("WeakerAccess")
+ public static final class SortedLongSerializerSnapshot
+ extends SimpleTypeSerializerSnapshot<Long> {
+
+ public SortedLongSerializerSnapshot() {
+ super(() -> INSTANCE);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
new file mode 100644
index 00000000000..fc975bda184
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends
TableStreamOperator<RowData>
+ implements OneInputStreamOperator<RowData, RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+ private static final String STATE_CLEARED_WARN_MSG =
+ "The state is cleared because of state TTL. This will lead to
incorrect results. "
+ + "You can increase the state TTL to avoid this.";
+ private static final Set<String> ORDERED_STATE_BACKENDS =
Set.of("rocksdb", "forst");
+
+ private final InsertConflictStrategy conflictStrategy;
+ private final TypeSerializer<RowData> serializer;
+ private final GeneratedRecordEqualiser generatedRecordEqualiser;
+ private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+ private final int[] inputUpsertKey;
+ private final boolean hasUpsertKey;
+ private final RowType keyType;
+ private final String[] primaryKeyNames;
+
+ private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor;
+ // Buffers incoming changelog records (INSERT, UPDATE_BEFORE,
UPDATE_AFTER, DELETE) keyed by
+ // their timestamp. Watermarks act as compaction barriers: when a
watermark arrives, we know
+ // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been
received and can be
+ // compacted together. This solves the out-of-order problem where a later
UPDATE_AFTER may
+ // arrive before the UPDATE_BEFORE of a previous change.
+ private transient MapState<Long, List<RowData>> buffer;
+
+ // Stores the last emitted value for the current primary key. Used to
detect duplicates
+ // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on
subsequent compactions.
+ private transient ValueState<RowData> currentValue;
+ private transient RecordEqualiser equaliser;
+ private transient RecordEqualiser upsertKeyEqualiser;
+ private transient TimestampedCollector<RowData> collector;
+ private transient boolean isOrderedStateBackend;
+
+ // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+ private transient ProjectedRowData upsertKeyProjectedRow1;
+ private transient ProjectedRowData upsertKeyProjectedRow2;
+
+ // Field getters for formatting the primary key in error messages.
+ private transient RowData.FieldGetter[] keyFieldGetters;
+
+ // Tracks the current watermark. Used to detect in-flight records after
restore.
+ private transient long currentWatermark = Long.MIN_VALUE;
+
+ public WatermarkCompactingSinkMaterializer(
+ InsertConflictStrategy conflictStrategy,
+ TypeSerializer<RowData> serializer,
+ GeneratedRecordEqualiser generatedRecordEqualiser,
+ @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+ @Nullable int[] inputUpsertKey,
+ RowType keyType,
+ String[] primaryKeyNames) {
+ validateConflictStrategy(conflictStrategy);
+ this.conflictStrategy = conflictStrategy;
+ this.serializer = serializer;
+ this.generatedRecordEqualiser = generatedRecordEqualiser;
+ this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+ this.inputUpsertKey = inputUpsertKey;
+ this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length >
0;
+ this.keyType = keyType;
+ this.primaryKeyNames = primaryKeyNames;
+ }
+
+ private static void validateConflictStrategy(InsertConflictStrategy
strategy) {
+ Preconditions.checkArgument(
+ strategy.getBehavior() == ConflictBehavior.ERROR
+ || strategy.getBehavior() == ConflictBehavior.NOTHING,
+ "Only ERROR and NOTHING strategies are supported, got: %s",
+ strategy);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+
+ // Initialize state descriptors and handles
+ this.bufferDescriptor =
+ new MapStateDescriptor<>(
+ "watermark-buffer",
+ SortedLongSerializer.INSTANCE,
+ new ListSerializer<>(serializer));
+ this.buffer =
context.getKeyedStateStore().getMapState(bufferDescriptor);
+
+ ValueStateDescriptor<RowData> currentValueDescriptor =
+ new ValueStateDescriptor<>("current-value", serializer);
+ this.currentValue =
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+ if (context.isRestored()) {
+ // Detect ordered state backend before consolidation
+ detectOrderedStateBackend();
+
+ // Consolidate all buffered records to MIN_VALUE for each key.
+ // This ensures they are compacted on the first watermark after
restore.
+ getKeyedStateBackend()
+ .applyToAllKeys(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ bufferDescriptor,
+ (key, state) -> consolidateBufferToMinValue());
+ }
+ }
+
+ private void consolidateBufferToMinValue() throws Exception {
+ List<RowData> consolidated = new ArrayList<>();
+
+ if (isOrderedStateBackend) {
+ // RocksDB/ForSt: entries are already sorted by timestamp
+ Iterator<Map.Entry<Long, List<RowData>>> iterator =
buffer.entries().iterator();
+ while (iterator.hasNext()) {
+ consolidated.addAll(iterator.next().getValue());
+ iterator.remove();
+ }
+ } else {
+ // Other backends: collect, sort by timestamp, then consolidate
+ List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+ Iterator<Map.Entry<Long, List<RowData>>> iterator =
buffer.entries().iterator();
+ while (iterator.hasNext()) {
+ entries.add(iterator.next());
+ iterator.remove();
+ }
+
+ entries.sort(Map.Entry.comparingByKey());
+
+ for (Map.Entry<Long, List<RowData>> entry : entries) {
+ consolidated.addAll(entry.getValue());
+ }
+ }
+
+ if (!consolidated.isEmpty()) {
+ buffer.put(Long.MIN_VALUE, consolidated);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ initializeEqualisers();
+ initializeKeyFieldGetters();
+ detectOrderedStateBackend();
+ this.collector = new TimestampedCollector<>(output);
+ }
+
+ private void initializeKeyFieldGetters() {
+ this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+ for (int i = 0; i < primaryKeyNames.length; i++) {
+ LogicalType fieldType = keyType.getTypeAt(i);
+ keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+ }
+ }
+
+ private void initializeEqualisers() {
+ if (hasUpsertKey) {
+ this.upsertKeyEqualiser =
+ generatedUpsertKeyEqualiser.newInstance(
+ getRuntimeContext().getUserCodeClassLoader());
+ upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+ upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+ }
+ this.equaliser =
+
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ }
+
+ private void detectOrderedStateBackend() {
+ KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+ String backendType =
+ keyedStateBackend != null ?
keyedStateBackend.getBackendTypeIdentifier() : "";
+ this.isOrderedStateBackend =
ORDERED_STATE_BACKENDS.contains(backendType);
+
+ if (isOrderedStateBackend) {
+ LOG.info("Using ordered state backend optimization for {}
backend", backendType);
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception
{
+ RowData row = element.getValue();
+ long assignedTimestamp = element.getTimestamp();
+
+ // If we haven't received any watermark yet (still at MIN_VALUE after
restore)
+ // and the timestamp is beyond MIN_VALUE, it's from in-flight data
that was
+ // checkpointed before restore. Assign to MIN_VALUE.
+ if (currentWatermark == Long.MIN_VALUE && assignedTimestamp >
Long.MIN_VALUE) {
+ assignedTimestamp = Long.MIN_VALUE;
+ }
+
+ bufferRecord(assignedTimestamp, row);
+ }
+
+ private void bufferRecord(long timestamp, RowData row) throws Exception {
+ List<RowData> records = buffer.get(timestamp);
+ if (records == null) {
+ records = new ArrayList<>();
+ }
+ switch (row.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ // Try to cancel out a pending retraction; if none, just append
+ if (!tryCancelRetraction(records, row)) {
+ records.add(row);
+ }
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ // Try to cancel out an existing addition; if none, keep for
cross-bucket
+ if (!tryCancelAddition(records, row)) {
+ records.add(row);
+ }
+ break;
+ }
+ buffer.put(timestamp, records);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ final long watermarkTimestamp = mark.getTimestamp();
+ this.currentWatermark = watermarkTimestamp;
+
+ // Iterate over all keys and compact their buffered records
+ this.<RowData>getKeyedStateBackend()
+ .applyToAllKeys(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ bufferDescriptor,
+ (key, state) -> compactAndEmit(watermarkTimestamp));
+
+ super.processWatermark(mark);
+ }
+
+ private void compactAndEmit(long newWatermark) throws Exception {
+ RowData previousValue = currentValue.value();
+ List<RowData> pendingRecords = collectPendingRecords(previousValue,
newWatermark);
+
+ if (pendingRecords.size() > 1) {
+ if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
+ RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
+ throw new TableRuntimeException(
+ "Primary key constraint violation: multiple distinct
records with "
+ + "the same primary key detected. Conflicting
key: ["
+ + formatKey(key)
+ + "]. Use ON CONFLICT DO NOTHING to keep the
first record.");
+ } else if (previousValue == null) {
+ final RowData newValue = pendingRecords.get(0);
+ emit(newValue, INSERT);
+ currentValue.update(newValue);
+ }
+ } else if (pendingRecords.isEmpty()) {
+ if (previousValue != null) {
+ emit(previousValue, DELETE);
+ currentValue.clear();
+ }
+ } else {
+ final RowData newValue = pendingRecords.get(0);
+ if (previousValue == null) {
+ emit(newValue, INSERT);
+ currentValue.update(newValue);
+ } else if (!recordEquals(previousValue, newValue)) {
+ emit(newValue, UPDATE_AFTER);
+ currentValue.update(newValue);
+ }
+ }
+ }
+
+ private List<RowData> collectPendingRecords(RowData previousValue, long
newWatermark)
+ throws Exception {
+ List<RowData> records = new ArrayList<>();
+ if (previousValue != null) {
+ records.add(previousValue);
+ }
+ Iterator<Map.Entry<Long, List<RowData>>> iterator =
buffer.entries().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<Long, List<RowData>> entry = iterator.next();
+ if (entry.getKey() <= newWatermark) {
+ for (RowData pendingRecord : entry.getValue()) {
+ switch (pendingRecord.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ addRow(records, pendingRecord);
+ break;
+
+ case UPDATE_BEFORE:
+ case DELETE:
+ retractRow(records, pendingRecord);
+ break;
+ }
+ }
+ iterator.remove();
+ } else if (isOrderedStateBackend) {
+ break;
+ }
+ }
+ return records;
+ }
+
+ private void addRow(List<RowData> values, RowData add) {
+ if (hasUpsertKey) {
+ int index = findFirst(values, add);
+ if (index == -1) {
+ values.add(add);
+ } else {
+ values.set(index, add);
+ }
+ } else {
+ values.add(add);
+ }
+ }
+
+ private void retractRow(List<RowData> values, RowData retract) {
+ final int index = findFirst(values, retract);
+ if (index == -1) {
+ LOG.info(STATE_CLEARED_WARN_MSG);
+ } else {
+ // Remove first found row
+ values.remove(index);
+ }
+ }
+
+ /**
+ * Attempts to cancel out a retraction by finding a matching retractive
record
+ * (DELETE/UPDATE_BEFORE) with identical content.
+ *
+ * @return true if a matching retraction was found and removed, false
otherwise
+ */
+ private boolean tryCancelRetraction(List<RowData> values, RowData
addition) {
+ final Iterator<RowData> iterator = values.iterator();
+ while (iterator.hasNext()) {
+ RowData candidate = iterator.next();
+ RowKind kind = candidate.getRowKind();
+ if ((kind == DELETE || kind == RowKind.UPDATE_BEFORE)
+ && recordEquals(addition, candidate)) {
+ iterator.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Attempts to cancel out an addition by finding a matching additive record
+ * (INSERT/UPDATE_AFTER) with identical content.
+ *
+ * @return true if a matching addition was found and removed, false
otherwise
+ */
+ private boolean tryCancelAddition(List<RowData> values, RowData
retraction) {
+ final Iterator<RowData> iterator = values.iterator();
+ while (iterator.hasNext()) {
+ RowData candidate = iterator.next();
+ RowKind kind = candidate.getRowKind();
+ if ((kind == INSERT || kind == UPDATE_AFTER) &&
recordEquals(retraction, candidate)) {
+ iterator.remove();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private int findFirst(List<RowData> values, RowData target) {
+ final Iterator<RowData> iterator = values.iterator();
+ int i = 0;
+ while (iterator.hasNext()) {
+ if (equalsIgnoreRowKind(target, iterator.next())) {
+ return i;
+ }
+ i++;
+ }
+ return -1;
+ }
+
+ private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
+ newRow.setRowKind(oldRow.getRowKind());
+ if (hasUpsertKey) {
+ return this.upsertKeyEqualiser.equals(
+ upsertKeyProjectedRow1.replaceRow(newRow),
+ upsertKeyProjectedRow2.replaceRow(oldRow));
+ }
+ return equaliser.equals(newRow, oldRow);
+ }
+
+ private void emit(RowData row, RowKind kind) {
+ RowKind originalKind = row.getRowKind();
+ row.setRowKind(kind);
+ collector.collect(row);
+ row.setRowKind(originalKind);
+ }
+
+ private boolean recordEquals(RowData row1, RowData row2) {
+ RowKind kind1 = row1.getRowKind();
+ RowKind kind2 = row2.getRowKind();
+ row1.setRowKind(RowKind.INSERT);
+ row2.setRowKind(RowKind.INSERT);
+ boolean result = equaliser.equals(row1, row2);
+ row1.setRowKind(kind1);
+ row2.setRowKind(kind2);
+ return result;
+ }
+
+ private String formatKey(RowData key) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < primaryKeyNames.length; i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(primaryKeyNames[i]).append("=");
+ if (key.isNullAt(i)) {
+ sb.append("null");
+ } else {
+ sb.append(keyFieldGetters[i].getFieldOrNull(key));
+ }
+ }
+ return sb.toString();
+ }
+
+ /** Factory method to create a new instance. */
+ public static WatermarkCompactingSinkMaterializer create(
+ InsertConflictStrategy conflictStrategy,
+ RowType physicalRowType,
+ GeneratedRecordEqualiser rowEqualiser,
+ @Nullable GeneratedRecordEqualiser upsertKeyEqualiser,
+ @Nullable int[] inputUpsertKey,
+ RowType keyType,
+ String[] primaryKeyNames) {
+ return new WatermarkCompactingSinkMaterializer(
+ conflictStrategy,
+ InternalSerializers.create(physicalRowType),
+ rowEqualiser,
+ upsertKeyEqualiser,
+ inputUpsertKey,
+ keyType,
+ primaryKeyNames);
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
new file mode 100644
index 00000000000..17598df521d
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+/**
+ * Operator that assigns the current watermark as the timestamp to each
incoming StreamRecord.
+ *
+ * <p>This is used in conjunction with {@link
WatermarkCompactingSinkMaterializer} which buffers
+ * records by their timestamp. Without meaningful timestamps, all records
would be buffered under
+ * the same key, breaking the watermark-based compaction logic.
+ *
+ * <p>If the current watermark is {@code Long.MIN_VALUE} (the initial state
before any watermark
+ * arrives), records will be assigned that value and will be compacted when
the first watermark
+ * arrives.
+ */
+@Internal
+public class WatermarkTimestampAssigner extends TableStreamOperator<RowData>
+ implements OneInputStreamOperator<RowData, RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(StreamRecord<RowData> element) throws Exception
{
+ element.setTimestamp(currentWatermark);
+ output.collect(element);
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
new file mode 100644
index 00000000000..d331eb18c94
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Random;
+
+/** A test for the {@link SortedLongSerializer}. */
+class SortedLongSerializerTest extends SerializerTestBase<Long> {
+
+ @Override
+ protected TypeSerializer<Long> createSerializer() {
+ return SortedLongSerializer.INSTANCE;
+ }
+
+ @Override
+ protected int getLength() {
+ return 8;
+ }
+
+ @Override
+ protected Class<Long> getTypeClass() {
+ return Long.class;
+ }
+
+ @Override
+ protected Long[] getTestData() {
+ Random rnd = new Random(874597969123412341L);
+ long rndLong = rnd.nextLong();
+
+ return new Long[] {0L, 1L, -1L, Long.MAX_VALUE, Long.MIN_VALUE,
rndLong, -rndLong};
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
new file mode 100644
index 00000000000..adb701a58e9
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
@@ -0,0 +1,769 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {
+
+ private static final int PRIMARY_KEY_INDEX = 1;
+ private static final String PRIMARY_KEY_NAME = "pk";
+
+ private static final LogicalType[] LOGICAL_TYPES =
+ new LogicalType[] {new BigIntType(), new IntType(), new
VarCharType()};
+
+ private static final GeneratedRecordEqualiser RECORD_EQUALISER =
+ new GeneratedRecordEqualiser("", "", new Object[0]) {
+ @Override
+ public RecordEqualiser newInstance(ClassLoader classLoader) {
+ return new TestRecordEqualiser();
+ }
+ };
+
+ private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
+ new GeneratedRecordEqualiser("", "", new Object[0]) {
+ @Override
+ public RecordEqualiser newInstance(ClassLoader classLoader) {
+ return new TestUpsertKeyEqualiser();
+ }
+ };
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior)
throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+
+ // Insert first record (watermark is MIN_VALUE)
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ assertEmitsNothing(harness); // Buffered, waiting for watermark
+
+ // Advance watermark to trigger compaction
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "a1"));
+
+ // Update with same upsert key (this is the expected pattern for
single-source updates)
+ harness.processElement(updateAfterRecord(1L, 1, "a2"));
+ assertEmitsNothing(harness);
+
+ // Advance watermark again
+ harness.processWatermark(200L);
+ assertEmits(harness, updateAfter(1L, 1, "a2"));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+
+ // Insert and compact
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "a1"));
+
+ // Delete and compact
+ harness.processElement(deleteRecord(1L, 1, "a1"));
+ harness.processWatermark(200L);
+ assertEmits(harness, delete(1L, 1, "a1"));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws
Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+
+ // Insert and delete before watermark advances
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ harness.processElement(deleteRecord(1L, 1, "a1"));
+
+ // Compact - should emit nothing since insert and delete cancel out
+ harness.processWatermark(100L);
+ assertEmitsNothing(harness);
+ }
+ }
+
+ @Test
+ void testDoNothingKeepsFirstRecord() throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(ConflictBehavior.NOTHING)) {
+ harness.open();
+
+ // Insert two records with different upsert keys but same primary
key
+ harness.processElement(insertRecord(1L, 1, "first"));
+ harness.processElement(insertRecord(2L, 1, "second"));
+
+ // Compact - should keep the first record
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "first"));
+ }
+ }
+
+ @Test
+ void testDoErrorThrowsOnConflict() throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(ConflictBehavior.ERROR)) {
+ harness.open();
+
+ // Insert two records with different upsert keys but same primary
key
+ harness.processElement(insertRecord(1L, 1, "first"));
+ harness.processElement(insertRecord(2L, 1, "second"));
+
+ // Compact - should throw exception with key info
+ assertThatThrownBy(() -> harness.processWatermark(100L))
+ .isInstanceOf(TableRuntimeException.class)
+ .hasMessageContaining("Primary key constraint violation")
+ .hasMessageContaining("[pk=1]");
+ }
+ }
+
+ @Test
+ void testDoErrorAllowsSameUpsertKey() throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(ConflictBehavior.ERROR)) {
+ harness.open();
+
+ // Insert two records with same upsert key (updates to same source)
+ harness.processElement(insertRecord(1L, 1, "v1"));
+ harness.processElement(updateAfterRecord(1L, 1, "v2"));
+
+ // Compact - should not throw, just keep the latest
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "v2"));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testChangelogDisorderHandling(ConflictBehavior behavior) throws
Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+
+ // Simulate changelog disorder from FLIP-558 example:
+ // Records from different sources (different upsert keys: 1L and
2L) map to same PK (1)
+ // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
+ // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)
+
+ // The +U from source 2 arrives first (upsert key = 2L)
+ harness.processElement(updateAfterRecord(2L, 1, "b1"));
+ // Then +I and -U from source 1 arrive (upsert key = 1L)
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ harness.processElement(updateBeforeRecord(1L, 1, "a1"));
+
+ // Net result: only (2L, 1, "b1") remains after cancellation, no
conflict
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(2L, 1, "b1"));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testChangelogDisorderUpdateBeforeAfterInsert(ConflictBehavior
behavior) throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+
+ // Simulate changelog disorder where UPDATE_AFTER arrives before
UPDATE_BEFORE:
+ // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(1,1,a2)
+ // Disordered: +I(1,1,a1), +U(1,1,a2), -U(1,1,a1)
+
+ // INSERT the initial value
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ // UPDATE_AFTER arrives before the UPDATE_BEFORE
+ harness.processElement(updateAfterRecord(1L, 1, "a2"));
+ // UPDATE_BEFORE (retraction of the original INSERT) arrives last
+ harness.processElement(updateBeforeRecord(1L, 1, "a1"));
+
+ // Net result: +I and -U cancel out, only +U(1,1,a2) remains
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "a2"));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws
Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+
+ // Insert and compact
+ harness.processElement(insertRecord(1L, 1, "value"));
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "value"));
+
+ // Insert same value again (same upsert key)
+ harness.processElement(updateAfterRecord(1L, 1, "value"));
+ harness.processWatermark(200L);
+ // Should not emit since value is the same
+ assertEmitsNothing(harness);
+ }
+ }
+
+ /**
+ * Tests that record timestamps are handled correctly when multiple inputs
send records that
+ * arrive out of timestamp order. This simulates the case where records
from different upstream
+ * tasks have different timestamps and arrive interleaved.
+ *
+ * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All
records have same primary
+ * key (1).
+ *
+ * <p>Sequence:
+ *
+ * <ol>
+ * <li>INSERT(input=1, t=2)
+ * <li>watermark=3 -> emits INSERT
+ * <li>UPDATE_BEFORE(input=1, t=4)
+ * <li>UPDATE_AFTER(input=1, t=6)
+ * <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has
smaller timestamp
+ * <li>watermark=5 -> compacts t<=5 records
+ * <li>UPDATE_BEFORE(input=2, t=6)
+ * <li>watermark=10 -> compacts remaining t=6 records
+ * </ol>
+ */
+ @Test
+ void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(ConflictBehavior.NOTHING)) {
+ harness.open();
+
+ // INSERT from input 1 with timestamp 2
+ harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"v1", 2L));
+ assertEmitsNothing(harness);
+
+ // watermark=3: compacts records with t<=3, emits INSERT(t=2)
+ harness.processWatermark(3L);
+ assertEmits(harness, insert(1L, 1, "v1"));
+
+ // UPDATE_BEFORE from input 1 with timestamp 4
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE,
1L, 1, "v1", 4L));
+ assertEmitsNothing(harness);
+
+ // UPDATE_AFTER from input 1 with timestamp 6
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v4", 6L));
+ assertEmitsNothing(harness);
+
+ // UPDATE_AFTER from input 2 with timestamp 4
+ // This record arrives after the t=6 record but has a smaller
timestamp
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
2L, 1, "v3", 4L));
+
+ // watermark=5: compacts records with t<=5
+ // Buffered: UPDATE_BEFORE(1L, t=4) and UPDATE_AFTER(2L, t=4)
cancel out for input 1,
+ // UPDATE_AFTER(2L, t=4) is emitted
+ harness.processWatermark(5L);
+ assertEmits(harness, updateAfter(2L, 1, "v3"));
+
+ // UPDATE_BEFORE from input 2 with timestamp 6
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE,
2L, 1, "v3", 6L));
+ assertEmitsNothing(harness);
+
+ // Final watermark to flush all remaining buffered records (t=6)
+ // Buffered: UPDATE_AFTER(1L, t=6) and UPDATE_BEFORE(2L, t=6)
+ // After compaction: UPDATE_AFTER(1L, "v4") remains as final value
+ harness.processWatermark(10L);
+ assertEmits(harness, updateAfter(1L, 1, "v4"));
+ }
+ }
+
+ // --- Tests without upsert key ---
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testBasicInsertWithoutUpsertKey(ConflictBehavior behavior) throws
Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(behavior)) {
+ harness.open();
+
+ // Insert first record
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ assertEmitsNothing(harness);
+
+ // Advance watermark to trigger compaction
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "a1"));
+ }
+ }
+
+ @Test
+ void testUpdateWithoutUpsertKeyNothingStrategy() throws Exception {
+ // Without upsert key, UPDATE_AFTER on existing value causes conflict
(two rows accumulate)
+ // NOTHING strategy keeps the first value
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
+ harness.open();
+
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "a1"));
+
+ // UPDATE_AFTER with different value - creates conflict, NOTHING
keeps first
+ harness.processElement(updateAfterRecord(2L, 1, "a2"));
+ harness.processWatermark(200L);
+ // NOTHING keeps the previous value (1L, 1, "a1"), no emission
since unchanged
+ assertEmitsNothing(harness);
+ }
+ }
+
+ @Test
+ void testUpdateWithoutUpsertKeyErrorStrategy() throws Exception {
+ // Without upsert key, UPDATE_AFTER on existing value causes conflict
(two rows accumulate)
+ // ERROR strategy throws
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
+ harness.open();
+
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "a1"));
+
+ // UPDATE_AFTER with different value - creates conflict, ERROR
throws with key info
+ harness.processElement(updateAfterRecord(2L, 1, "a2"));
+ assertThatThrownBy(() -> harness.processWatermark(200L))
+ .isInstanceOf(TableRuntimeException.class)
+ .hasMessageContaining("Primary key constraint violation")
+ .hasMessageContaining("[pk=1]");
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testDeleteAfterInsertWithoutUpsertKey(ConflictBehavior behavior)
throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(behavior)) {
+ harness.open();
+
+ // Insert and compact
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "a1"));
+
+ // Delete with exact same row and compact
+ harness.processElement(deleteRecord(1L, 1, "a1"));
+ harness.processWatermark(200L);
+ assertEmits(harness, delete(1L, 1, "a1"));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testInsertAndDeleteInSameWindowWithoutUpsertKey(ConflictBehavior
behavior)
+ throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(behavior)) {
+ harness.open();
+
+ // Insert and delete with exact same row before watermark advances
+ harness.processElement(insertRecord(1L, 1, "a1"));
+ harness.processElement(deleteRecord(1L, 1, "a1"));
+
+ // Compact - should emit nothing since insert and delete cancel out
+ harness.processWatermark(100L);
+ assertEmitsNothing(harness);
+ }
+ }
+
+ @Test
+ void testIdenticalInsertsWithoutUpsertKeyNothingKeepsFirst() throws
Exception {
+ // Without upsert key, even identical inserts are separate entries
+ // NOTHING strategy just keeps the first one
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
+ harness.open();
+
+ // Insert two identical records (same full row)
+ harness.processElement(insertRecord(1L, 1, "same"));
+ harness.processElement(insertRecord(1L, 1, "same"));
+
+ // Compact - NOTHING keeps first record
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "same"));
+ }
+ }
+
+ @Test
+ void testIdenticalInsertsWithoutUpsertKeyErrorThrows() throws Exception {
+ // Without upsert key, even identical inserts are separate entries
+ // ERROR strategy throws because there are multiple records
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
+ harness.open();
+
+ // Insert two identical records (same full row)
+ harness.processElement(insertRecord(1L, 1, "same"));
+ harness.processElement(insertRecord(1L, 1, "same"));
+
+ // Compact - ERROR throws because there are multiple pending
records, includes key info
+ assertThatThrownBy(() -> harness.processWatermark(100L))
+ .isInstanceOf(TableRuntimeException.class)
+ .hasMessageContaining("Primary key constraint violation")
+ .hasMessageContaining("[pk=1]");
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testInsertUpdateDeleteWithoutUpsertKey(ConflictBehavior behavior)
throws Exception {
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarnessWithoutUpsertKey(behavior)) {
+ harness.open();
+
+ // Insert, then update_before + update_after sequence
+ harness.processElement(insertRecord(1L, 1, "v1"));
+ harness.processWatermark(100L);
+ assertEmits(harness, insert(1L, 1, "v1"));
+
+ // Update: retract old value, insert new value
+ harness.processElement(updateBeforeRecord(1L, 1, "v1"));
+ harness.processElement(updateAfterRecord(2L, 1, "v2"));
+ harness.processWatermark(200L);
+ assertEmits(harness, updateAfter(2L, 1, "v2"));
+
+ // Delete the current value
+ harness.processElement(deleteRecord(2L, 1, "v2"));
+ harness.processWatermark(300L);
+ assertEmits(harness, delete(2L, 1, "v2"));
+ }
+ }
+
+ // --- Restore Tests ---
+
+ /**
+ * Tests that buffered records at different timestamps before checkpoint
are consolidated to
+ * MIN_VALUE on restore and compacted on the first watermark.
+ */
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testBufferedRecordsConsolidatedOnRestore(ConflictBehavior behavior)
throws Exception {
+ OperatorSubtaskState snapshot;
+
+ // First harness: buffer records at different timestamps, then take
snapshot
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+
+ // Buffer records at different timestamps (simulating records
before checkpoint)
+ harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"v1", 1000L));
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v2", 2000L));
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v3", 3000L));
+
+ // No watermark yet, so nothing emitted
+ assertEmitsNothing(harness);
+
+ // Take snapshot with buffered records
+ snapshot = harness.snapshot(1L, 1L);
+ }
+
+ // Second harness: restore from snapshot
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.initializeState(snapshot);
+ harness.open();
+
+ // After restore, watermarks restart from MIN_VALUE.
+ // The buffered records should have been consolidated to MIN_VALUE.
+ // First watermark (even a small one) should trigger compaction of
all consolidated
+ // records.
+ harness.processWatermark(100L);
+
+ // All records were from same upsert key, so only final value is
emitted
+ assertEmits(harness, insert(1L, 1, "v3"));
+ }
+ }
+
+ /**
+ * Tests that in-flight records from unaligned checkpoints (records with
timestamps > MIN_VALUE
+ * arriving before first watermark after restore) are correctly handled.
+ */
+ @ParameterizedTest
+ @EnumSource(
+ value = ConflictBehavior.class,
+ names = {"ERROR", "NOTHING"})
+ void testInFlightRecordsAfterRestore(ConflictBehavior behavior) throws
Exception {
+ OperatorSubtaskState snapshot;
+
+ // First harness: take empty snapshot
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.open();
+ snapshot = harness.snapshot(1L, 1L);
+ }
+
+ // Second harness: restore and simulate in-flight records
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(behavior)) {
+ harness.initializeState(snapshot);
+ harness.open();
+
+ // Simulate in-flight records that were checkpointed with their
old timestamps.
+ // These arrive after restore but before any watermark is received.
+ // They have timestamps > MIN_VALUE from before the checkpoint.
+ harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"v1", 5000L));
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v2", 5100L));
+
+ // No watermark yet, nothing emitted
+ assertEmitsNothing(harness);
+
+ // First watermark after restore. Since currentWatermark was
MIN_VALUE,
+ // in-flight records should have been assigned to MIN_VALUE and
will be compacted.
+ harness.processWatermark(100L);
+
+ // Both records had same upsert key, so only final value is emitted
+ assertEmits(harness, insert(1L, 1, "v2"));
+ }
+ }
+
+ /**
+ * Tests restore with multiple keys having buffered records at different
timestamps. Verifies
+ * that each key's records are correctly consolidated and compacted.
+ */
+ @Test
+ void testRestoreWithMultipleKeys() throws Exception {
+ OperatorSubtaskState snapshot;
+
+ // First harness: buffer records for multiple keys
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(ConflictBehavior.NOTHING)) {
+ harness.open();
+
+ // Key 1: multiple updates
+ harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"k1v1", 1000L));
+ harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "k1v2", 2000L));
+
+ // Key 2: single insert
+ harness.processElement(recordWithTimestamp(RowKind.INSERT, 2L, 2,
"k2v1", 1500L));
+
+ // Key 3: insert then delete (should result in nothing)
+ harness.processElement(recordWithTimestamp(RowKind.INSERT, 3L, 3,
"k3v1", 1000L));
+ harness.processElement(recordWithTimestamp(RowKind.DELETE, 3L, 3,
"k3v1", 2500L));
+
+ snapshot = harness.snapshot(1L, 1L);
+ }
+
+ // Second harness: restore and verify
+ try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
+ createHarness(ConflictBehavior.NOTHING)) {
+ harness.initializeState(snapshot);
+ harness.open();
+
+ // First watermark compacts all consolidated records
+ harness.processWatermark(100L);
+
+ // Extract and verify results (order depends on key processing
order)
+ List<RowData> emitted = extractRecords(harness);
+ assertThat(emitted).hasSize(2);
+ // Key 1 should have final value "k1v2", Key 2 should have "k2v1",
Key 3 cancelled out
+ assertThat(emitted)
+ .anySatisfy(
+ row -> {
+ assertThat(row.getInt(1)).isEqualTo(1);
+
assertThat(row.getString(2).toString()).isEqualTo("k1v2");
+ })
+ .anySatisfy(
+ row -> {
+ assertThat(row.getInt(1)).isEqualTo(2);
+
assertThat(row.getString(2).toString()).isEqualTo("k2v1");
+ });
+ }
+ }
+
+ // --- Helper Methods ---
+
+ private StreamRecord<RowData> recordWithTimestamp(
+ RowKind kind, long upsertKey, int pk, String value, long
timestamp) {
+ return new StreamRecord<>(rowOfKind(kind, upsertKey, pk, value),
timestamp);
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
createHarness(
+ ConflictBehavior behavior) throws Exception {
+ RowType keyType = RowType.of(new LogicalType[]
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
+ WatermarkCompactingSinkMaterializer operator =
+ WatermarkCompactingSinkMaterializer.create(
+ toStrategy(behavior),
+ RowType.of(LOGICAL_TYPES),
+ RECORD_EQUALISER,
+ UPSERT_KEY_EQUALISER,
+ new int[] {0}, // upsert key is first column
+ keyType,
+ new String[] {PRIMARY_KEY_NAME});
+
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator,
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
+ .getProducedType());
+ }
+
+ private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
+ createHarnessWithoutUpsertKey(ConflictBehavior behavior) throws
Exception {
+ RowType keyType = RowType.of(new LogicalType[]
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
+ WatermarkCompactingSinkMaterializer operator =
+ WatermarkCompactingSinkMaterializer.create(
+ toStrategy(behavior),
+ RowType.of(LOGICAL_TYPES),
+ RECORD_EQUALISER,
+ null, // no upsert key equaliser
+ null, // no upsert key
+ keyType,
+ new String[] {PRIMARY_KEY_NAME});
+
+ return new KeyedOneInputStreamOperatorTestHarness<>(
+ operator,
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
+ .getProducedType());
+ }
+
+ private static InsertConflictStrategy toStrategy(ConflictBehavior
behavior) {
+ switch (behavior) {
+ case ERROR:
+ return InsertConflictStrategy.error();
+ case NOTHING:
+ return InsertConflictStrategy.nothing();
+ case DEDUPLICATE:
+ return InsertConflictStrategy.deduplicate();
+ default:
+ throw new IllegalArgumentException("Unknown behavior: " +
behavior);
+ }
+ }
+
+ private void assertEmitsNothing(
+ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness) {
+ assertThat(extractRecords(harness)).isEmpty();
+ }
+
+ private void assertEmits(
+ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness,
+ RowData... expected) {
+ List<RowData> emitted = extractRecords(harness);
+ assertThat(emitted).containsExactly(expected);
+ }
+
+ private List<RowData> extractRecords(
+ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness) {
+ final RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[LOGICAL_TYPES.length];
+ for (int i = 0; i < LOGICAL_TYPES.length; i++) {
+ fieldGetters[i] = RowData.createFieldGetter(LOGICAL_TYPES[i], i);
+ }
+
+ final List<RowData> rows = new ArrayList<>();
+ Object o;
+ while ((o = harness.getOutput().poll()) != null) {
+ // Skip watermarks, only process StreamRecords
+ if (o instanceof StreamRecord) {
+ RowData value = (RowData) ((StreamRecord<?>) o).getValue();
+ Object[] row = new Object[LOGICAL_TYPES.length];
+ for (int i = 0; i < LOGICAL_TYPES.length; i++) {
+ row[i] = fieldGetters[i].getFieldOrNull(value);
+ }
+ GenericRowData newRow = GenericRowData.of(row);
+ newRow.setRowKind(value.getRowKind());
+ rows.add(newRow);
+ }
+ }
+ return rows;
+ }
+
+ /** Test equaliser that compares all fields. */
+ private static class TestRecordEqualiser implements RecordEqualiser {
+ @Override
+ public boolean equals(RowData row1, RowData row2) {
+ return row1.getRowKind() == row2.getRowKind()
+ && row1.getLong(0) == row2.getLong(0)
+ && row1.getInt(1) == row2.getInt(1)
+ && Objects.equals(row1.getString(2), row2.getString(2));
+ }
+ }
+
+ /** Test equaliser that only compares the upsert key (first column). */
+ private static class TestUpsertKeyEqualiser implements RecordEqualiser {
+ @Override
+ public boolean equals(RowData row1, RowData row2) {
+ return row1.getLong(0) == row2.getLong(0);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
index 40929ed6ac6..99a5deb41e9 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
@@ -104,6 +104,26 @@ public class StreamRecordUtils {
return new StreamRecord<>(row);
}
+ /** Creates a new {@link RowData} with INSERT kind. Alias for {@link
#row(Object...)}. */
+ public static RowData insert(Object... fields) {
+ return rowOfKind(RowKind.INSERT, fields);
+ }
+
+ /** Creates a new {@link RowData} with UPDATE_BEFORE kind. */
+ public static RowData updateBefore(Object... fields) {
+ return rowOfKind(RowKind.UPDATE_BEFORE, fields);
+ }
+
+ /** Creates a new {@link RowData} with UPDATE_AFTER kind. */
+ public static RowData updateAfter(Object... fields) {
+ return rowOfKind(RowKind.UPDATE_AFTER, fields);
+ }
+
+ /** Creates a new {@link RowData} with DELETE kind. */
+ public static RowData delete(Object... fields) {
+ return rowOfKind(RowKind.DELETE, fields);
+ }
+
/** Receives a object array, generates a RowData based on the array. */
public static RowData rowOfKind(RowKind rowKind, Object... fields) {
Object[] objects = new Object[fields.length];
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index 976cef6349b..9122d74cb79 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -73,6 +73,7 @@ import org.apache.flink.table.dataview.ListViewSerializer;
import org.apache.flink.table.dataview.MapViewSerializer;
import org.apache.flink.table.dataview.NullAwareMapSerializer;
import org.apache.flink.table.dataview.NullSerializer;
+import org.apache.flink.table.runtime.operators.sink.SortedLongSerializer;
import org.apache.flink.table.runtime.operators.window.CountWindow;
import
org.apache.flink.table.runtime.sequencedmultisetstate.linked.MetaSqnInfoSerializer;
import
org.apache.flink.table.runtime.sequencedmultisetstate.linked.NodeSerializer;
@@ -263,7 +264,8 @@ public class TypeSerializerTestCoverageTest extends
TestLogger {
NodeSerializer.class.getName(),
RowSqnInfoSerializer.class.getName(),
MetaSqnInfoSerializer.class.getName(),
- SetSerializer.class.getName());
+ SetSerializer.class.getName(),
+ SortedLongSerializer.class.getName());
// check if a test exists for each type serializer
for (Class<? extends TypeSerializer> typeSerializer : typeSerializers)
{