This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 743b9bd2b32 [FLINK-38928] Implement an operator for handling DO 
ERROR/NOTHING (#27502)
743b9bd2b32 is described below

commit 743b9bd2b32149559983eb983c9ec0391a02fb9e
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 12 09:30:17 2026 +0100

    [FLINK-38928] Implement an operator for handling DO ERROR/NOTHING (#27502)
---
 .../plan/nodes/exec/common/CommonExecSink.java     |   7 +-
 .../plan/nodes/exec/stream/StreamExecSink.java     |  76 +-
 .../nodes/physical/stream/StreamPhysicalSink.scala |  35 +
 .../FlinkChangelogModeInferenceProgram.scala       |  38 +-
 .../plan/nodes/exec/stream/SinkSemanticTests.java  |  31 +-
 .../plan/nodes/exec/stream/SinkTestPrograms.java   |  35 +-
 .../operators/sink/SortedLongSerializer.java       | 115 +++
 .../sink/WatermarkCompactingSinkMaterializer.java  | 504 ++++++++++++++
 .../operators/sink/WatermarkTimestampAssigner.java |  49 ++
 .../operators/sink/SortedLongSerializerTest.java   |  51 ++
 .../WatermarkCompactingSinkMaterializerTest.java   | 769 +++++++++++++++++++++
 .../table/runtime/util/StreamRecordUtils.java      |  20 +
 .../TypeSerializerTestCoverageTest.java            |   4 +-
 13 files changed, 1668 insertions(+), 66 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 0fc273ca684..31c0bc4074f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -187,9 +187,6 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         Optional<LineageVertex> lineageVertexOpt =
                 TableLineageUtils.extractLineageDataset(outputObject);
 
-        // only add materialization if input has change
-        final boolean needMaterialization = !inputInsertOnly && 
upsertMaterialize;
-
         Transformation<RowData> sinkTransform =
                 applyConstraintValidations(inputTransform, config, 
persistedRowType);
 
@@ -202,10 +199,10 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                             primaryKeys,
                             sinkParallelism,
                             inputParallelism,
-                            needMaterialization);
+                            upsertMaterialize);
         }
 
-        if (needMaterialization) {
+        if (upsertMaterialize) {
             sinkTransform =
                     applyUpsertMaterialize(
                             sinkTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 1726105dee8..ce95c8b65bf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter;
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
@@ -55,6 +55,8 @@ import 
org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
 import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2;
+import 
org.apache.flink.table.runtime.operators.sink.WatermarkCompactingSinkMaterializer;
+import 
org.apache.flink.table.runtime.operators.sink.WatermarkTimestampAssigner;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
@@ -100,6 +102,7 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
         producedTransformations = {
             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
             CommonExecSink.PARTITIONER_TRANSFORMATION,
+            StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
             CommonExecSink.SINK_TRANSFORMATION
@@ -124,6 +127,7 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
         producedTransformations = {
             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
             CommonExecSink.PARTITIONER_TRANSFORMATION,
+            StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
             CommonExecSink.SINK_TRANSFORMATION
@@ -133,6 +137,9 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
 public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Object> {
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamExecSink.class);
 
+    public static final String WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION =
+            "watermark-timestamp-assigner";
+
     public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = 
"inputChangelogMode";
     public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = 
"requireUpsertMaterialize";
     public static final String FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY = 
"upsertMaterializeStrategy";
@@ -237,17 +244,6 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
     @Override
     protected Transformation<Object> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
-        // TODO: FLINK-38928 Remove this validation once runtime support for 
ERROR and NOTHING is
-        // implemented
-        if (InsertConflictStrategy.error().equals(conflictStrategy)
-                || InsertConflictStrategy.nothing().equals(conflictStrategy)) {
-            throw new ValidationException(
-                    "ON CONFLICT DO "
-                            + conflictStrategy
-                            + " is not yet supported. "
-                            + "Please use ON CONFLICT DO DEDUPLICATE 
instead.");
-        }
-
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -341,10 +337,17 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 StateConfigUtil.createTtlConfig(
                         StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList));
 
+        final String[] pkFieldNames =
+                Arrays.stream(primaryKeys)
+                        .mapToObj(idx -> 
physicalRowType.getFieldNames().get(idx))
+                        .toArray(String[]::new);
+
         final OneInputStreamOperator<RowData, RowData> operator =
                 createSumOperator(
                         config,
                         physicalRowType,
+                        primaryKeys,
+                        pkFieldNames,
                         inputUpsertKey,
                         upsertKeyEqualiser,
                         upsertKeyHashFunction,
@@ -352,15 +355,29 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                         rowEqualiser,
                         rowHashFunction);
 
-        final String[] fieldNames = 
physicalRowType.getFieldNames().toArray(new String[0]);
-        final List<String> pkFieldNames =
-                Arrays.stream(primaryKeys)
-                        .mapToObj(idx -> fieldNames[idx])
-                        .collect(Collectors.toList());
+        // For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first
+        // This assigns the current watermark as the timestamp to each record,
+        // which is required for the WatermarkCompactingSinkMaterializer to 
work correctly
+        Transformation<RowData> transformForMaterializer = inputTransform;
+        if (isErrorOrNothingConflictStrategy()) {
+            // Use input parallelism to preserve watermark semantics
+            transformForMaterializer =
+                    ExecNodeUtil.createOneInputTransformation(
+                            inputTransform,
+                            createTransformationMeta(
+                                    
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
+                                    "WatermarkTimestampAssigner",
+                                    "WatermarkTimestampAssigner",
+                                    config),
+                            new WatermarkTimestampAssigner(),
+                            inputTransform.getOutputType(),
+                            inputTransform.getParallelism(),
+                            false);
+        }
 
         OneInputTransformation<RowData, RowData> materializeTransform =
                 ExecNodeUtil.createOneInputTransformation(
-                        inputTransform,
+                        transformForMaterializer,
                         createTransformationMeta(
                                 UPSERT_MATERIALIZE_TRANSFORMATION,
                                 String.format(
@@ -383,6 +400,8 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
     private OneInputStreamOperator<RowData, RowData> createSumOperator(
             ExecNodeConfig config,
             RowType physicalRowType,
+            int[] primaryKeys,
+            String[] pkFieldNames,
             int[] inputUpsertKey,
             GeneratedRecordEqualiser upsertKeyEqualiser,
             GeneratedHashFunction upsertKeyHashFunction,
@@ -390,6 +409,21 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
             GeneratedRecordEqualiser rowEqualiser,
             GeneratedHashFunction rowHashFunction) {
 
+        // Check if we should use the watermark-compacting materializer for 
ERROR/NOTHING strategies
+        if (isErrorOrNothingConflictStrategy()) {
+            RowType keyType = RowTypeUtils.projectRowType(physicalRowType, 
primaryKeys);
+
+            return WatermarkCompactingSinkMaterializer.create(
+                    conflictStrategy,
+                    physicalRowType,
+                    rowEqualiser,
+                    upsertKeyEqualiser,
+                    inputUpsertKey,
+                    keyType,
+                    pkFieldNames);
+        }
+
+        // Use existing logic for DEDUPLICATE (legacy behavior)
         SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
                 Optional.ofNullable(upsertMaterializeStrategy)
                         .orElse(SinkUpsertMaterializeStrategy.LEGACY);
@@ -415,6 +449,12 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                                 config));
     }
 
+    private boolean isErrorOrNothingConflictStrategy() {
+        return conflictStrategy != null
+                && (conflictStrategy.getBehavior() == ConflictBehavior.ERROR
+                        || conflictStrategy.getBehavior() == 
ConflictBehavior.NOTHING);
+    }
+
     private static SequencedMultiSetStateConfig createStateConfig(
             SinkUpsertMaterializeStrategy strategy,
             TimeDomain ttlTimeDomain,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index c77d8312d52..cf2a7d53c9e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.table.api.InsertConflictStrategy
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.{SinkUpsertMaterializeStrategy,
 TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY}
 import org.apache.flink.table.catalog.ContextResolvedTable
 import org.apache.flink.table.connector.sink.DynamicTableSink
@@ -34,9 +35,12 @@ import 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.hint.RelHint
+import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
 
+import scala.collection.JavaConversions._
+
 /**
  * Stream physical RelNode to write data into an external sink defined by a 
[[DynamicTableSink]].
  */
@@ -134,4 +138,35 @@ class StreamPhysicalSink(
       .itemIf("upsertMaterialize", "true", upsertMaterialize)
       .itemIf("conflictStrategy", conflictStrategy, conflictStrategy != null)
   }
+
+  def isDeduplicateConflictStrategy: Boolean = {
+    conflictStrategy != null && conflictStrategy.getBehavior == 
ConflictBehavior.DEDUPLICATE
+  }
+
+  def primaryKeysContainsUpsertKey: Boolean = {
+    val primaryKeys = 
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
+    val pks = ImmutableBitSet.of(primaryKeys: _*)
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
+    val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
+    changeLogUpsertKeys != null && changeLogUpsertKeys.exists(pks.contains)
+  }
+
+  def getUpsertKeyNames: String = {
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
+    val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
+    if (changeLogUpsertKeys == null) {
+      "none"
+    } else {
+      val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
+      changeLogUpsertKeys
+        .map(uk => uk.toArray.map(fieldNames.get).mkString("[", ", ", "]"))
+        .mkString(", ")
+    }
+  }
+
+  def getPrimaryKeyNames: String = {
+    val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
+    val primaryKeys = 
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
+    primaryKeys.map(fieldNames.get).mkString("[", ", ", "]")
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index baa9b25db02..9cc84db2b7b 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -1060,38 +1060,30 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
         case UpsertMaterialize.NONE => false
         case UpsertMaterialize.AUTO =>
-          if (
-            (inputIsAppend && InsertConflictStrategy
-              .deduplicate()
-              .equals(sink.conflictStrategy)) || sinkIsAppend || sinkIsRetract
-          ) {
+          // if the sink is not an UPSERT sink (has no PK, or is an APPEND or 
RETRACT sink)
+          // we don't need to materialize results
+          if (primaryKeys.isEmpty || sinkIsAppend || sinkIsRetract) {
             return false
           }
-          if (primaryKeys.isEmpty) {
+
+          // For a DEDUPLICATE strategy and INSERT only input, we simply let 
the inserts be handled
+          // as UPSERT_AFTER and overwrite previous value
+          if (inputIsAppend && sink.isDeduplicateConflictStrategy) {
             return false
           }
-          val pks = ImmutableBitSet.of(primaryKeys: _*)
-          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
-          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
-          // if input has updates and primary key != upsert key (upsert key 
can be null) we should
-          // enable upsertMaterialize. An optimize is: do not enable 
upsertMaterialize when sink
-          // pk(s) contains input changeLogUpsertKeys
-          val upsertKeyDiffersFromPk =
-            changeLogUpsertKeys == null || 
!changeLogUpsertKeys.exists(pks.contains)
+
+          // if input has updates and primary key != upsert key  we should 
enable upsertMaterialize.
+          //
+          // An optimize is: do not enable upsertMaterialize when sink pk(s) 
contains input
+          // changeLogUpsertKeys
+          val upsertKeyDiffersFromPk = !sink.primaryKeysContainsUpsertKey
 
           // Validate that ON CONFLICT is specified when upsert key differs 
from primary key
           val requireOnConflict =
             
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT)
           if (requireOnConflict && upsertKeyDiffersFromPk && 
sink.conflictStrategy == null) {
-            val fieldNames = 
sink.contextResolvedTable.getResolvedSchema.getColumnNames
-            val pkNames = primaryKeys.map(fieldNames.get(_)).mkString("[", ", 
", "]")
-            val upsertKeyNames = if (changeLogUpsertKeys == null) {
-              "none"
-            } else {
-              changeLogUpsertKeys
-                .map(uk => uk.toArray.map(fieldNames.get(_)).mkString("[", ", 
", "]"))
-                .mkString(", ")
-            }
+            val pkNames = sink.getPrimaryKeyNames
+            val upsertKeyNames = sink.getUpsertKeyNames
             throw new ValidationException(
               "The query has an upsert key that differs from the primary key 
of the sink table " +
                 
s"'${sink.contextResolvedTable.getIdentifier.asSummaryString}'. " +
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
index 7eaa6c06c7f..6dba8353f02 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
@@ -18,21 +18,48 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.api.TableEnvironment;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TestStep;
+import org.apache.flink.table.test.program.TestStep.TestKind;
 
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 
 /** Semantic tests for {@link StreamExecSink}. */
 public class SinkSemanticTests extends SemanticTestBase {
 
+    @Override
+    public EnumSet<TestKind> supportedSetupSteps() {
+        EnumSet<TestKind> steps = super.supportedSetupSteps();
+        steps.add(TestKind.SINK_WITHOUT_DATA);
+        return steps;
+    }
+
+    @Override
+    protected void runStep(TestStep testStep, TableEnvironment env) throws 
Exception {
+        if (testStep.getKind() == TestKind.SINK_WITHOUT_DATA) {
+            final SinkTestStep sinkTestStep = (SinkTestStep) testStep;
+            sinkTestStep.apply(
+                    env,
+                    Map.ofEntries(
+                            Map.entry("connector", "values"),
+                            Map.entry("sink-insert-only", "false")));
+        } else {
+            super.runStep(testStep, env);
+        }
+    }
+
     @Override
     public List<TableTestProgram> programs() {
         return List.of(
                 SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK,
                 SinkTestPrograms.INSERT_RETRACT_WITH_PK,
-                SinkTestPrograms.ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED,
-                SinkTestPrograms.ON_CONFLICT_DO_ERROR_NOT_SUPPORTED,
+                SinkTestPrograms.ON_CONFLICT_DO_NOTHING_KEEPS_FIRST,
+                SinkTestPrograms.ON_CONFLICT_DO_ERROR_NO_CONFLICT,
                 
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT,
                 SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITH_ON_CONFLICT,
                 SinkTestPrograms.UPSERT_KEY_MATCHES_PK_WITHOUT_ON_CONFLICT,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
index c59fb88521d..6ddc55cc18b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
@@ -89,46 +89,47 @@ public class SinkTestPrograms {
                             "INSERT INTO sink_t SELECT UPPER(name), SUM(score) 
FROM source_t GROUP BY name")
                     .build();
 
-    // --- ON CONFLICT validation tests ---
+    // --- ON CONFLICT tests ---
 
-    public static final TableTestProgram ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED =
+    public static final TableTestProgram ON_CONFLICT_DO_NOTHING_KEEPS_FIRST =
             TableTestProgram.of(
-                            "sink-on-conflict-do-nothing-not-supported",
-                            "ON CONFLICT DO NOTHING is not yet supported and 
should throw ValidationException.")
+                            "sink-on-conflict-do-nothing-keeps-first",
+                            "ON CONFLICT DO NOTHING keeps the first record 
when multiple records have the same PK.")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
                                     .addSchema("a INT", "b BIGINT")
                                     .addOption("changelog-mode", "I")
-                                    .producedValues(Row.ofKind(RowKind.INSERT, 
1, 1L))
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 10L),
+                                            Row.ofKind(RowKind.INSERT, 1, 20L),
+                                            Row.ofKind(RowKind.INSERT, 2, 30L))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink_t")
                                     .addSchema("a INT PRIMARY KEY NOT 
ENFORCED", "b BIGINT")
+                                    .consumedValues("+I[1, 10]", "+I[2, 30]")
                                     .build())
-                    .runFailingSql(
-                            "INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO NOTHING",
-                            ValidationException.class,
-                            "ON CONFLICT DO NOTHING is not yet supported")
+                    .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO NOTHING")
                     .build();
 
-    public static final TableTestProgram ON_CONFLICT_DO_ERROR_NOT_SUPPORTED =
+    public static final TableTestProgram ON_CONFLICT_DO_ERROR_NO_CONFLICT =
             TableTestProgram.of(
-                            "sink-on-conflict-do-error-not-supported",
-                            "ON CONFLICT DO ERROR is not yet supported and 
should throw ValidationException.")
+                            "sink-on-conflict-do-error-no-conflict",
+                            "ON CONFLICT DO ERROR with no conflicts passes 
through all records.")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
                                     .addSchema("a INT", "b BIGINT")
                                     .addOption("changelog-mode", "I")
-                                    .producedValues(Row.ofKind(RowKind.INSERT, 
1, 1L))
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 10L),
+                                            Row.ofKind(RowKind.INSERT, 2, 20L))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink_t")
                                     .addSchema("a INT PRIMARY KEY NOT 
ENFORCED", "b BIGINT")
+                                    .consumedValues("+I[1, 10]", "+I[2, 20]")
                                     .build())
-                    .runFailingSql(
-                            "INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO ERROR",
-                            ValidationException.class,
-                            "ON CONFLICT DO ERROR is not yet supported")
+                    .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO ERROR")
                     .build();
 
     public static final TableTestProgram 
UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT =
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
new file mode 100644
index 00000000000..4a31a1d2da1
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.MathUtils;
+
+import java.io.IOException;
+
+/**
+ * A serializer for {@code Long} that produces a lexicographically sortable 
byte representation.
+ *
+ * <p>Standard big-endian long serialization does not maintain numeric 
ordering for negative values
+ * in lexicographic byte comparison because negative numbers have their sign 
bit set (1), which
+ * makes them appear greater than positive numbers (sign bit 0) in unsigned 
byte comparison.
+ *
+ * <p>This serializer flips the sign bit during serialization, converting from 
signed to unsigned
+ * ordering. This ensures that when iterating over keys in a sorted state 
backend (like RocksDB),
+ * the entries are returned in numeric order.
+ *
+ * @see org.apache.flink.streaming.api.operators.TimerSerializer
+ */
+@Internal
+public final class SortedLongSerializer extends TypeSerializerSingleton<Long> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Sharable instance of the SortedLongSerializer. */
+    public static final SortedLongSerializer INSTANCE = new 
SortedLongSerializer();
+
+    private static final Long ZERO = 0L;
+
+    @Override
+    public boolean isImmutableType() {
+        return true;
+    }
+
+    @Override
+    public Long createInstance() {
+        return ZERO;
+    }
+
+    @Override
+    public Long copy(Long from) {
+        return from;
+    }
+
+    @Override
+    public Long copy(Long from, Long reuse) {
+        return from;
+    }
+
+    @Override
+    public int getLength() {
+        return Long.BYTES;
+    }
+
+    @Override
+    public void serialize(Long record, DataOutputView target) throws 
IOException {
+        target.writeLong(MathUtils.flipSignBit(record));
+    }
+
+    @Override
+    public Long deserialize(DataInputView source) throws IOException {
+        return MathUtils.flipSignBit(source.readLong());
+    }
+
+    @Override
+    public Long deserialize(Long reuse, DataInputView source) throws 
IOException {
+        return deserialize(source);
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        target.writeLong(source.readLong());
+    }
+
+    @Override
+    public TypeSerializerSnapshot<Long> snapshotConfiguration() {
+        return new SortedLongSerializerSnapshot();
+    }
+
+    // ------------------------------------------------------------------------
+
+    /** Serializer configuration snapshot for compatibility and format 
evolution. */
+    @SuppressWarnings("WeakerAccess")
+    public static final class SortedLongSerializerSnapshot
+            extends SimpleTypeSerializerSnapshot<Long> {
+
+        public SortedLongSerializerSnapshot() {
+            super(() -> INSTANCE);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
new file mode 100644
index 00000000000..fc975bda184
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor;
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.bufferDescriptor =
+                new MapStateDescriptor<>(
+                        "watermark-buffer",
+                        SortedLongSerializer.INSTANCE,
+                        new ListSerializer<>(serializer));
+        this.buffer = 
context.getKeyedStateStore().getMapState(bufferDescriptor);
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        if (context.isRestored()) {
+            // Detect ordered state backend before consolidation
+            detectOrderedStateBackend();
+
+            // Consolidate all buffered records to MIN_VALUE for each key.
+            // This ensures they are compacted on the first watermark after 
restore.
+            getKeyedStateBackend()
+                    .applyToAllKeys(
+                            VoidNamespace.INSTANCE,
+                            VoidNamespaceSerializer.INSTANCE,
+                            bufferDescriptor,
+                            (key, state) -> consolidateBufferToMinValue());
+        }
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        final long watermarkTimestamp = mark.getTimestamp();
+        this.currentWatermark = watermarkTimestamp;
+
+        // Iterate over all keys and compact their buffered records
+        this.<RowData>getKeyedStateBackend()
+                .applyToAllKeys(
+                        VoidNamespace.INSTANCE,
+                        VoidNamespaceSerializer.INSTANCE,
+                        bufferDescriptor,
+                        (key, state) -> compactAndEmit(watermarkTimestamp));
+
+        super.processWatermark(mark);
+    }
+
+    private void compactAndEmit(long newWatermark) throws Exception {
+        RowData previousValue = currentValue.value();
+        List<RowData> pendingRecords = collectPendingRecords(previousValue, 
newWatermark);
+
+        if (pendingRecords.size() > 1) {
+            if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
+                RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
+                throw new TableRuntimeException(
+                        "Primary key constraint violation: multiple distinct 
records with "
+                                + "the same primary key detected. Conflicting 
key: ["
+                                + formatKey(key)
+                                + "]. Use ON CONFLICT DO NOTHING to keep the 
first record.");
+            } else if (previousValue == null) {
+                final RowData newValue = pendingRecords.get(0);
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            }
+        } else if (pendingRecords.isEmpty()) {
+            if (previousValue != null) {
+                emit(previousValue, DELETE);
+                currentValue.clear();
+            }
+        } else {
+            final RowData newValue = pendingRecords.get(0);
+            if (previousValue == null) {
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            } else if (!recordEquals(previousValue, newValue)) {
+                emit(newValue, UPDATE_AFTER);
+                currentValue.update(newValue);
+            }
+        }
+    }
+
+    private List<RowData> collectPendingRecords(RowData previousValue, long 
newWatermark)
+            throws Exception {
+        List<RowData> records = new ArrayList<>();
+        if (previousValue != null) {
+            records.add(previousValue);
+        }
+        Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+
+        while (iterator.hasNext()) {
+            Map.Entry<Long, List<RowData>> entry = iterator.next();
+            if (entry.getKey() <= newWatermark) {
+                for (RowData pendingRecord : entry.getValue()) {
+                    switch (pendingRecord.getRowKind()) {
+                        case INSERT:
+                        case UPDATE_AFTER:
+                            addRow(records, pendingRecord);
+                            break;
+
+                        case UPDATE_BEFORE:
+                        case DELETE:
+                            retractRow(records, pendingRecord);
+                            break;
+                    }
+                }
+                iterator.remove();
+            } else if (isOrderedStateBackend) {
+                break;
+            }
+        }
+        return records;
+    }
+
+    private void addRow(List<RowData> values, RowData add) {
+        if (hasUpsertKey) {
+            int index = findFirst(values, add);
+            if (index == -1) {
+                values.add(add);
+            } else {
+                values.set(index, add);
+            }
+        } else {
+            values.add(add);
+        }
+    }
+
+    private void retractRow(List<RowData> values, RowData retract) {
+        final int index = findFirst(values, retract);
+        if (index == -1) {
+            LOG.info(STATE_CLEARED_WARN_MSG);
+        } else {
+            // Remove first found row
+            values.remove(index);
+        }
+    }
+
+    /**
+     * Attempts to cancel out a retraction by finding a matching retractive 
record
+     * (DELETE/UPDATE_BEFORE) with identical content.
+     *
+     * @return true if a matching retraction was found and removed, false 
otherwise
+     */
+    private boolean tryCancelRetraction(List<RowData> values, RowData 
addition) {
+        final Iterator<RowData> iterator = values.iterator();
+        while (iterator.hasNext()) {
+            RowData candidate = iterator.next();
+            RowKind kind = candidate.getRowKind();
+            if ((kind == DELETE || kind == RowKind.UPDATE_BEFORE)
+                    && recordEquals(addition, candidate)) {
+                iterator.remove();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Attempts to cancel out an addition by finding a matching additive record
+     * (INSERT/UPDATE_AFTER) with identical content.
+     *
+     * @return true if a matching addition was found and removed, false 
otherwise
+     */
+    private boolean tryCancelAddition(List<RowData> values, RowData 
retraction) {
+        final Iterator<RowData> iterator = values.iterator();
+        while (iterator.hasNext()) {
+            RowData candidate = iterator.next();
+            RowKind kind = candidate.getRowKind();
+            if ((kind == INSERT || kind == UPDATE_AFTER) && 
recordEquals(retraction, candidate)) {
+                iterator.remove();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private int findFirst(List<RowData> values, RowData target) {
+        final Iterator<RowData> iterator = values.iterator();
+        int i = 0;
+        while (iterator.hasNext()) {
+            if (equalsIgnoreRowKind(target, iterator.next())) {
+                return i;
+            }
+            i++;
+        }
+        return -1;
+    }
+
+    private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
+        newRow.setRowKind(oldRow.getRowKind());
+        if (hasUpsertKey) {
+            return this.upsertKeyEqualiser.equals(
+                    upsertKeyProjectedRow1.replaceRow(newRow),
+                    upsertKeyProjectedRow2.replaceRow(oldRow));
+        }
+        return equaliser.equals(newRow, oldRow);
+    }
+
+    private void emit(RowData row, RowKind kind) {
+        RowKind originalKind = row.getRowKind();
+        row.setRowKind(kind);
+        collector.collect(row);
+        row.setRowKind(originalKind);
+    }
+
+    private boolean recordEquals(RowData row1, RowData row2) {
+        RowKind kind1 = row1.getRowKind();
+        RowKind kind2 = row2.getRowKind();
+        row1.setRowKind(RowKind.INSERT);
+        row2.setRowKind(RowKind.INSERT);
+        boolean result = equaliser.equals(row1, row2);
+        row1.setRowKind(kind1);
+        row2.setRowKind(kind2);
+        return result;
+    }
+
+    private String formatKey(RowData key) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            if (i > 0) {
+                sb.append(", ");
+            }
+            sb.append(primaryKeyNames[i]).append("=");
+            if (key.isNullAt(i)) {
+                sb.append("null");
+            } else {
+                sb.append(keyFieldGetters[i].getFieldOrNull(key));
+            }
+        }
+        return sb.toString();
+    }
+
+    /** Factory method to create a new instance. */
+    public static WatermarkCompactingSinkMaterializer create(
+            InsertConflictStrategy conflictStrategy,
+            RowType physicalRowType,
+            GeneratedRecordEqualiser rowEqualiser,
+            @Nullable GeneratedRecordEqualiser upsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        return new WatermarkCompactingSinkMaterializer(
+                conflictStrategy,
+                InternalSerializers.create(physicalRowType),
+                rowEqualiser,
+                upsertKeyEqualiser,
+                inputUpsertKey,
+                keyType,
+                primaryKeyNames);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
new file mode 100644
index 00000000000..17598df521d
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+/**
+ * Operator that assigns the current watermark as the timestamp to each 
incoming StreamRecord.
+ *
+ * <p>This is used in conjunction with {@link 
WatermarkCompactingSinkMaterializer} which buffers
+ * records by their timestamp. Without meaningful timestamps, all records 
would be buffered under
+ * the same key, breaking the watermark-based compaction logic.
+ *
+ * <p>If the current watermark is {@code Long.MIN_VALUE} (the initial state 
before any watermark
+ * arrives), records will be assigned that value and will be compacted when 
the first watermark
+ * arrives.
+ */
+@Internal
+public class WatermarkTimestampAssigner extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        element.setTimestamp(currentWatermark);
+        output.collect(element);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
new file mode 100644
index 00000000000..d331eb18c94
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.util.Random;
+
+/** A test for the {@link SortedLongSerializer}. */
+class SortedLongSerializerTest extends SerializerTestBase<Long> {
+
+    @Override
+    protected TypeSerializer<Long> createSerializer() {
+        return SortedLongSerializer.INSTANCE;
+    }
+
+    @Override
+    protected int getLength() {
+        return 8;
+    }
+
+    @Override
+    protected Class<Long> getTypeClass() {
+        return Long.class;
+    }
+
+    @Override
+    protected Long[] getTestData() {
+        Random rnd = new Random(874597969123412341L);
+        long rndLong = rnd.nextLong();
+
+        return new Long[] {0L, 1L, -1L, Long.MAX_VALUE, Long.MIN_VALUE, 
rndLong, -rndLong};
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
new file mode 100644
index 00000000000..adb701a58e9
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
@@ -0,0 +1,769 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {
+
+    private static final int PRIMARY_KEY_INDEX = 1;
+    private static final String PRIMARY_KEY_NAME = "pk";
+
+    private static final LogicalType[] LOGICAL_TYPES =
+            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
+
+    private static final GeneratedRecordEqualiser RECORD_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert first record (watermark is MIN_VALUE)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness); // Buffered, waiting for watermark
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // Update with same upsert key (this is the expected pattern for 
single-source updates)
+            harness.processElement(updateAfterRecord(1L, 1, "a2"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark again
+            harness.processWatermark(200L);
+            assertEmits(harness, updateAfter(1L, 1, "a2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // Delete and compact
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+            harness.processWatermark(200L);
+            assertEmits(harness, delete(1L, 1, "a1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and delete before watermark advances
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Compact - should emit nothing since insert and delete cancel out
+            harness.processWatermark(100L);
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testDoNothingKeepsFirstRecord() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should keep the first record
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "first"));
+        }
+    }
+
+    @Test
+    void testDoErrorThrowsOnConflict() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should throw exception with key info
+            assertThatThrownBy(() -> harness.processWatermark(100L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation")
+                    .hasMessageContaining("[pk=1]");
+        }
+    }
+
+    @Test
+    void testDoErrorAllowsSameUpsertKey() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with same upsert key (updates to same source)
+            harness.processElement(insertRecord(1L, 1, "v1"));
+            harness.processElement(updateAfterRecord(1L, 1, "v2"));
+
+            // Compact - should not throw, just keep the latest
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "v2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testChangelogDisorderHandling(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Simulate changelog disorder from FLIP-558 example:
+            // Records from different sources (different upsert keys: 1L and 
2L) map to same PK (1)
+            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
+            // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)
+
+            // The +U from source 2 arrives first (upsert key = 2L)
+            harness.processElement(updateAfterRecord(2L, 1, "b1"));
+            // Then +I and -U from source 1 arrive (upsert key = 1L)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(updateBeforeRecord(1L, 1, "a1"));
+
+            // Net result: only (2L, 1, "b1") remains after cancellation, no 
conflict
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(2L, 1, "b1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testChangelogDisorderUpdateBeforeAfterInsert(ConflictBehavior 
behavior) throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Simulate changelog disorder where UPDATE_AFTER arrives before 
UPDATE_BEFORE:
+            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(1,1,a2)
+            // Disordered: +I(1,1,a1), +U(1,1,a2), -U(1,1,a1)
+
+            // INSERT the initial value
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            // UPDATE_AFTER arrives before the UPDATE_BEFORE
+            harness.processElement(updateAfterRecord(1L, 1, "a2"));
+            // UPDATE_BEFORE (retraction of the original INSERT) arrives last
+            harness.processElement(updateBeforeRecord(1L, 1, "a1"));
+
+            // Net result: +I and -U cancel out, only +U(1,1,a2) remains
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "value"));
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "value"));
+
+            // Insert same value again (same upsert key)
+            harness.processElement(updateAfterRecord(1L, 1, "value"));
+            harness.processWatermark(200L);
+            // Should not emit since value is the same
+            assertEmitsNothing(harness);
+        }
+    }
+
+    /**
+     * Tests that record timestamps are handled correctly when multiple inputs 
send records that
+     * arrive out of timestamp order. This simulates the case where records 
from different upstream
+     * tasks have different timestamps and arrive interleaved.
+     *
+     * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All 
records have same primary
+     * key (1).
+     *
+     * <p>Sequence:
+     *
+     * <ol>
+     *   <li>INSERT(input=1, t=2)
+     *   <li>watermark=3 -> emits INSERT
+     *   <li>UPDATE_BEFORE(input=1, t=4)
+     *   <li>UPDATE_AFTER(input=1, t=6)
+     *   <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has 
smaller timestamp
+     *   <li>watermark=5 -> compacts t<=5 records
+     *   <li>UPDATE_BEFORE(input=2, t=6)
+     *   <li>watermark=10 -> compacts remaining t=6 records
+     * </ol>
+     */
+    @Test
+    void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // INSERT from input 1 with timestamp 2
+            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"v1", 2L));
+            assertEmitsNothing(harness);
+
+            // watermark=3: compacts records with t<=3, emits INSERT(t=2)
+            harness.processWatermark(3L);
+            assertEmits(harness, insert(1L, 1, "v1"));
+
+            // UPDATE_BEFORE from input 1 with timestamp 4
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 
1L, 1, "v1", 4L));
+            assertEmitsNothing(harness);
+
+            // UPDATE_AFTER from input 1 with timestamp 6
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v4", 6L));
+            assertEmitsNothing(harness);
+
+            // UPDATE_AFTER from input 2 with timestamp 4
+            // This record arrives after the t=6 record but has a smaller 
timestamp
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
2L, 1, "v3", 4L));
+
+            // watermark=5: compacts records with t<=5
+            // Buffered: UPDATE_BEFORE(1L, t=4) and UPDATE_AFTER(2L, t=4) 
cancel out for input 1,
+            // UPDATE_AFTER(2L, t=4) is emitted
+            harness.processWatermark(5L);
+            assertEmits(harness, updateAfter(2L, 1, "v3"));
+
+            // UPDATE_BEFORE from input 2 with timestamp 6
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 
2L, 1, "v3", 6L));
+            assertEmitsNothing(harness);
+
+            // Final watermark to flush all remaining buffered records (t=6)
+            // Buffered: UPDATE_AFTER(1L, t=6) and UPDATE_BEFORE(2L, t=6)
+            // After compaction: UPDATE_AFTER(1L, "v4") remains as final value
+            harness.processWatermark(10L);
+            assertEmits(harness, updateAfter(1L, 1, "v4"));
+        }
+    }
+
+    // --- Tests without upsert key ---
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithoutUpsertKey(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert first record
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+        }
+    }
+
+    @Test
+    void testUpdateWithoutUpsertKeyNothingStrategy() throws Exception {
+        // Without upsert key, UPDATE_AFTER on existing value causes conflict 
(two rows accumulate)
+        // NOTHING strategy keeps the first value
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // UPDATE_AFTER with different value - creates conflict, NOTHING 
keeps first
+            harness.processElement(updateAfterRecord(2L, 1, "a2"));
+            harness.processWatermark(200L);
+            // NOTHING keeps the previous value (1L, 1, "a1"), no emission 
since unchanged
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testUpdateWithoutUpsertKeyErrorStrategy() throws Exception {
+        // Without upsert key, UPDATE_AFTER on existing value causes conflict 
(two rows accumulate)
+        // ERROR strategy throws
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // UPDATE_AFTER with different value - creates conflict, ERROR 
throws with key info
+            harness.processElement(updateAfterRecord(2L, 1, "a2"));
+            assertThatThrownBy(() -> harness.processWatermark(200L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation")
+                    .hasMessageContaining("[pk=1]");
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsertWithoutUpsertKey(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // Delete with exact same row and compact
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+            harness.processWatermark(200L);
+            assertEmits(harness, delete(1L, 1, "a1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertAndDeleteInSameWindowWithoutUpsertKey(ConflictBehavior 
behavior)
+            throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert and delete with exact same row before watermark advances
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Compact - should emit nothing since insert and delete cancel out
+            harness.processWatermark(100L);
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testIdenticalInsertsWithoutUpsertKeyNothingKeepsFirst() throws 
Exception {
+        // Without upsert key, even identical inserts are separate entries
+        // NOTHING strategy just keeps the first one
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Insert two identical records (same full row)
+            harness.processElement(insertRecord(1L, 1, "same"));
+            harness.processElement(insertRecord(1L, 1, "same"));
+
+            // Compact - NOTHING keeps first record
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "same"));
+        }
+    }
+
+    @Test
+    void testIdenticalInsertsWithoutUpsertKeyErrorThrows() throws Exception {
+        // Without upsert key, even identical inserts are separate entries
+        // ERROR strategy throws because there are multiple records
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two identical records (same full row)
+            harness.processElement(insertRecord(1L, 1, "same"));
+            harness.processElement(insertRecord(1L, 1, "same"));
+
+            // Compact - ERROR throws because there are multiple pending 
records, includes key info
+            assertThatThrownBy(() -> harness.processWatermark(100L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation")
+                    .hasMessageContaining("[pk=1]");
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertUpdateDeleteWithoutUpsertKey(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert, then update_before + update_after sequence
+            harness.processElement(insertRecord(1L, 1, "v1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "v1"));
+
+            // Update: retract old value, insert new value
+            harness.processElement(updateBeforeRecord(1L, 1, "v1"));
+            harness.processElement(updateAfterRecord(2L, 1, "v2"));
+            harness.processWatermark(200L);
+            assertEmits(harness, updateAfter(2L, 1, "v2"));
+
+            // Delete the current value
+            harness.processElement(deleteRecord(2L, 1, "v2"));
+            harness.processWatermark(300L);
+            assertEmits(harness, delete(2L, 1, "v2"));
+        }
+    }
+
+    // --- Restore Tests ---
+
+    /**
+     * Tests that buffered records at different timestamps before checkpoint 
are consolidated to
+     * MIN_VALUE on restore and compacted on the first watermark.
+     */
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBufferedRecordsConsolidatedOnRestore(ConflictBehavior behavior) 
throws Exception {
+        OperatorSubtaskState snapshot;
+
+        // First harness: buffer records at different timestamps, then take 
snapshot
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Buffer records at different timestamps (simulating records 
before checkpoint)
+            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"v1", 1000L));
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v2", 2000L));
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v3", 3000L));
+
+            // No watermark yet, so nothing emitted
+            assertEmitsNothing(harness);
+
+            // Take snapshot with buffered records
+            snapshot = harness.snapshot(1L, 1L);
+        }
+
+        // Second harness: restore from snapshot
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.initializeState(snapshot);
+            harness.open();
+
+            // After restore, watermarks restart from MIN_VALUE.
+            // The buffered records should have been consolidated to MIN_VALUE.
+            // First watermark (even a small one) should trigger compaction of 
all consolidated
+            // records.
+            harness.processWatermark(100L);
+
+            // All records were from same upsert key, so only final value is 
emitted
+            assertEmits(harness, insert(1L, 1, "v3"));
+        }
+    }
+
+    /**
+     * Tests that in-flight records from unaligned checkpoints (records with 
timestamps > MIN_VALUE
+     * arriving before first watermark after restore) are correctly handled.
+     */
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInFlightRecordsAfterRestore(ConflictBehavior behavior) throws 
Exception {
+        OperatorSubtaskState snapshot;
+
+        // First harness: take empty snapshot
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+            snapshot = harness.snapshot(1L, 1L);
+        }
+
+        // Second harness: restore and simulate in-flight records
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.initializeState(snapshot);
+            harness.open();
+
+            // Simulate in-flight records that were checkpointed with their 
old timestamps.
+            // These arrive after restore but before any watermark is received.
+            // They have timestamps > MIN_VALUE from before the checkpoint.
+            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"v1", 5000L));
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v2", 5100L));
+
+            // No watermark yet, nothing emitted
+            assertEmitsNothing(harness);
+
+            // First watermark after restore. Since currentWatermark was 
MIN_VALUE,
+            // in-flight records should have been assigned to MIN_VALUE and 
will be compacted.
+            harness.processWatermark(100L);
+
+            // Both records had same upsert key, so only final value is emitted
+            assertEmits(harness, insert(1L, 1, "v2"));
+        }
+    }
+
+    /**
+     * Tests restore with multiple keys having buffered records at different 
timestamps. Verifies
+     * that each key's records are correctly consolidated and compacted.
+     */
+    @Test
+    void testRestoreWithMultipleKeys() throws Exception {
+        OperatorSubtaskState snapshot;
+
+        // First harness: buffer records for multiple keys
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Key 1: multiple updates
+            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"k1v1", 1000L));
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "k1v2", 2000L));
+
+            // Key 2: single insert
+            harness.processElement(recordWithTimestamp(RowKind.INSERT, 2L, 2, 
"k2v1", 1500L));
+
+            // Key 3: insert then delete (should result in nothing)
+            harness.processElement(recordWithTimestamp(RowKind.INSERT, 3L, 3, 
"k3v1", 1000L));
+            harness.processElement(recordWithTimestamp(RowKind.DELETE, 3L, 3, 
"k3v1", 2500L));
+
+            snapshot = harness.snapshot(1L, 1L);
+        }
+
+        // Second harness: restore and verify
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.initializeState(snapshot);
+            harness.open();
+
+            // First watermark compacts all consolidated records
+            harness.processWatermark(100L);
+
+            // Extract and verify results (order depends on key processing 
order)
+            List<RowData> emitted = extractRecords(harness);
+            assertThat(emitted).hasSize(2);
+            // Key 1 should have final value "k1v2", Key 2 should have "k2v1", 
Key 3 cancelled out
+            assertThat(emitted)
+                    .anySatisfy(
+                            row -> {
+                                assertThat(row.getInt(1)).isEqualTo(1);
+                                
assertThat(row.getString(2).toString()).isEqualTo("k1v2");
+                            })
+                    .anySatisfy(
+                            row -> {
+                                assertThat(row.getInt(1)).isEqualTo(2);
+                                
assertThat(row.getString(2).toString()).isEqualTo("k2v1");
+                            });
+        }
+    }
+
+    // --- Helper Methods ---
+
+    private StreamRecord<RowData> recordWithTimestamp(
+            RowKind kind, long upsertKey, int pk, String value, long 
timestamp) {
+        return new StreamRecord<>(rowOfKind(kind, upsertKey, pk, value), 
timestamp);
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
+            ConflictBehavior behavior) throws Exception {
+        RowType keyType = RowType.of(new LogicalType[] 
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
+        WatermarkCompactingSinkMaterializer operator =
+                WatermarkCompactingSinkMaterializer.create(
+                        toStrategy(behavior),
+                        RowType.of(LOGICAL_TYPES),
+                        RECORD_EQUALISER,
+                        UPSERT_KEY_EQUALISER,
+                        new int[] {0}, // upsert key is first column
+                        keyType,
+                        new String[] {PRIMARY_KEY_NAME});
+
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator,
+                HandwrittenSelectorUtil.getRowDataSelector(
+                        new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
+                HandwrittenSelectorUtil.getRowDataSelector(
+                                new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
+                        .getProducedType());
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
+            createHarnessWithoutUpsertKey(ConflictBehavior behavior) throws 
Exception {
+        RowType keyType = RowType.of(new LogicalType[] 
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
+        WatermarkCompactingSinkMaterializer operator =
+                WatermarkCompactingSinkMaterializer.create(
+                        toStrategy(behavior),
+                        RowType.of(LOGICAL_TYPES),
+                        RECORD_EQUALISER,
+                        null, // no upsert key equaliser
+                        null, // no upsert key
+                        keyType,
+                        new String[] {PRIMARY_KEY_NAME});
+
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator,
+                HandwrittenSelectorUtil.getRowDataSelector(
+                        new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
+                HandwrittenSelectorUtil.getRowDataSelector(
+                                new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
+                        .getProducedType());
+    }
+
+    private static InsertConflictStrategy toStrategy(ConflictBehavior 
behavior) {
+        switch (behavior) {
+            case ERROR:
+                return InsertConflictStrategy.error();
+            case NOTHING:
+                return InsertConflictStrategy.nothing();
+            case DEDUPLICATE:
+                return InsertConflictStrategy.deduplicate();
+            default:
+                throw new IllegalArgumentException("Unknown behavior: " + 
behavior);
+        }
+    }
+
+    private void assertEmitsNothing(
+            KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness) {
+        assertThat(extractRecords(harness)).isEmpty();
+    }
+
+    private void assertEmits(
+            KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness,
+            RowData... expected) {
+        List<RowData> emitted = extractRecords(harness);
+        assertThat(emitted).containsExactly(expected);
+    }
+
+    private List<RowData> extractRecords(
+            KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness) {
+        final RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[LOGICAL_TYPES.length];
+        for (int i = 0; i < LOGICAL_TYPES.length; i++) {
+            fieldGetters[i] = RowData.createFieldGetter(LOGICAL_TYPES[i], i);
+        }
+
+        final List<RowData> rows = new ArrayList<>();
+        Object o;
+        while ((o = harness.getOutput().poll()) != null) {
+            // Skip watermarks, only process StreamRecords
+            if (o instanceof StreamRecord) {
+                RowData value = (RowData) ((StreamRecord<?>) o).getValue();
+                Object[] row = new Object[LOGICAL_TYPES.length];
+                for (int i = 0; i < LOGICAL_TYPES.length; i++) {
+                    row[i] = fieldGetters[i].getFieldOrNull(value);
+                }
+                GenericRowData newRow = GenericRowData.of(row);
+                newRow.setRowKind(value.getRowKind());
+                rows.add(newRow);
+            }
+        }
+        return rows;
+    }
+
+    /** Test equaliser that compares all fields. */
+    private static class TestRecordEqualiser implements RecordEqualiser {
+        @Override
+        public boolean equals(RowData row1, RowData row2) {
+            return row1.getRowKind() == row2.getRowKind()
+                    && row1.getLong(0) == row2.getLong(0)
+                    && row1.getInt(1) == row2.getInt(1)
+                    && Objects.equals(row1.getString(2), row2.getString(2));
+        }
+    }
+
+    /** Test equaliser that only compares the upsert key (first column). */
+    private static class TestUpsertKeyEqualiser implements RecordEqualiser {
+        @Override
+        public boolean equals(RowData row1, RowData row2) {
+            return row1.getLong(0) == row2.getLong(0);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
index 40929ed6ac6..99a5deb41e9 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
@@ -104,6 +104,26 @@ public class StreamRecordUtils {
         return new StreamRecord<>(row);
     }
 
+    /** Creates a new {@link RowData} with INSERT kind. Alias for {@link 
#row(Object...)}. */
+    public static RowData insert(Object... fields) {
+        return rowOfKind(RowKind.INSERT, fields);
+    }
+
+    /** Creates a new {@link RowData} with UPDATE_BEFORE kind. */
+    public static RowData updateBefore(Object... fields) {
+        return rowOfKind(RowKind.UPDATE_BEFORE, fields);
+    }
+
+    /** Creates a new {@link RowData} with UPDATE_AFTER kind. */
+    public static RowData updateAfter(Object... fields) {
+        return rowOfKind(RowKind.UPDATE_AFTER, fields);
+    }
+
+    /** Creates a new {@link RowData} with DELETE kind. */
+    public static RowData delete(Object... fields) {
+        return rowOfKind(RowKind.DELETE, fields);
+    }
+
     /** Receives a object array, generates a RowData based on the array. */
     public static RowData rowOfKind(RowKind rowKind, Object... fields) {
         Object[] objects = new Object[fields.length];
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index 976cef6349b..9122d74cb79 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -73,6 +73,7 @@ import org.apache.flink.table.dataview.ListViewSerializer;
 import org.apache.flink.table.dataview.MapViewSerializer;
 import org.apache.flink.table.dataview.NullAwareMapSerializer;
 import org.apache.flink.table.dataview.NullSerializer;
+import org.apache.flink.table.runtime.operators.sink.SortedLongSerializer;
 import org.apache.flink.table.runtime.operators.window.CountWindow;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.MetaSqnInfoSerializer;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.NodeSerializer;
@@ -263,7 +264,8 @@ public class TypeSerializerTestCoverageTest extends 
TestLogger {
                         NodeSerializer.class.getName(),
                         RowSqnInfoSerializer.class.getName(),
                         MetaSqnInfoSerializer.class.getName(),
-                        SetSerializer.class.getName());
+                        SetSerializer.class.getName(),
+                        SortedLongSerializer.class.getName());
 
         // check if a test exists for each type serializer
         for (Class<? extends TypeSerializer> typeSerializer : typeSerializers) 
{


Reply via email to