This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1779d20a28f Revert "[FLINK-38928] Implement an operator for handling 
DO ERROR/NOTHING (#27502)"
1779d20a28f is described below

commit 1779d20a28f5d43410138314fe3292d0e64f3148
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 12 14:04:40 2026 +0100

    Revert "[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING 
(#27502)"
    
    This reverts commit 743b9bd2b32149559983eb983c9ec0391a02fb9e.
---
 .../plan/nodes/exec/common/CommonExecSink.java     |   7 +-
 .../plan/nodes/exec/stream/StreamExecSink.java     |  76 +-
 .../nodes/physical/stream/StreamPhysicalSink.scala |  35 -
 .../FlinkChangelogModeInferenceProgram.scala       |  38 +-
 .../plan/nodes/exec/stream/SinkSemanticTests.java  |  31 +-
 .../plan/nodes/exec/stream/SinkTestPrograms.java   |  35 +-
 .../operators/sink/SortedLongSerializer.java       | 115 ---
 .../sink/WatermarkCompactingSinkMaterializer.java  | 504 --------------
 .../operators/sink/WatermarkTimestampAssigner.java |  49 --
 .../operators/sink/SortedLongSerializerTest.java   |  51 --
 .../WatermarkCompactingSinkMaterializerTest.java   | 769 ---------------------
 .../table/runtime/util/StreamRecordUtils.java      |  20 -
 .../TypeSerializerTestCoverageTest.java            |   4 +-
 13 files changed, 66 insertions(+), 1668 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 31c0bc4074f..0fc273ca684 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -187,6 +187,9 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         Optional<LineageVertex> lineageVertexOpt =
                 TableLineageUtils.extractLineageDataset(outputObject);
 
+        // only add materialization if input has change
+        final boolean needMaterialization = !inputInsertOnly && 
upsertMaterialize;
+
         Transformation<RowData> sinkTransform =
                 applyConstraintValidations(inputTransform, config, 
persistedRowType);
 
@@ -199,10 +202,10 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                             primaryKeys,
                             sinkParallelism,
                             inputParallelism,
-                            upsertMaterialize);
+                            needMaterialization);
         }
 
-        if (upsertMaterialize) {
+        if (needMaterialization) {
             sinkTransform =
                     applyUpsertMaterialize(
                             sinkTransform,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index ce95c8b65bf..1726105dee8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.api.InsertConflictStrategy;
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter;
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
@@ -55,8 +55,6 @@ import 
org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
 import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2;
-import 
org.apache.flink.table.runtime.operators.sink.WatermarkCompactingSinkMaterializer;
-import 
org.apache.flink.table.runtime.operators.sink.WatermarkTimestampAssigner;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
@@ -102,7 +100,6 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
         producedTransformations = {
             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
             CommonExecSink.PARTITIONER_TRANSFORMATION,
-            StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
             CommonExecSink.SINK_TRANSFORMATION
@@ -127,7 +124,6 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
         producedTransformations = {
             CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
             CommonExecSink.PARTITIONER_TRANSFORMATION,
-            StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
             CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
             CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
             CommonExecSink.SINK_TRANSFORMATION
@@ -137,9 +133,6 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
 public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Object> {
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamExecSink.class);
 
-    public static final String WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION =
-            "watermark-timestamp-assigner";
-
     public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = 
"inputChangelogMode";
     public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = 
"requireUpsertMaterialize";
     public static final String FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY = 
"upsertMaterializeStrategy";
@@ -244,6 +237,17 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
     @Override
     protected Transformation<Object> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
+        // TODO: FLINK-38928 Remove this validation once runtime support for 
ERROR and NOTHING is
+        // implemented
+        if (InsertConflictStrategy.error().equals(conflictStrategy)
+                || InsertConflictStrategy.nothing().equals(conflictStrategy)) {
+            throw new ValidationException(
+                    "ON CONFLICT DO "
+                            + conflictStrategy
+                            + " is not yet supported. "
+                            + "Please use ON CONFLICT DO DEDUPLICATE 
instead.");
+        }
+
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -337,17 +341,10 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 StateConfigUtil.createTtlConfig(
                         StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList));
 
-        final String[] pkFieldNames =
-                Arrays.stream(primaryKeys)
-                        .mapToObj(idx -> 
physicalRowType.getFieldNames().get(idx))
-                        .toArray(String[]::new);
-
         final OneInputStreamOperator<RowData, RowData> operator =
                 createSumOperator(
                         config,
                         physicalRowType,
-                        primaryKeys,
-                        pkFieldNames,
                         inputUpsertKey,
                         upsertKeyEqualiser,
                         upsertKeyHashFunction,
@@ -355,29 +352,15 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                         rowEqualiser,
                         rowHashFunction);
 
-        // For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first
-        // This assigns the current watermark as the timestamp to each record,
-        // which is required for the WatermarkCompactingSinkMaterializer to 
work correctly
-        Transformation<RowData> transformForMaterializer = inputTransform;
-        if (isErrorOrNothingConflictStrategy()) {
-            // Use input parallelism to preserve watermark semantics
-            transformForMaterializer =
-                    ExecNodeUtil.createOneInputTransformation(
-                            inputTransform,
-                            createTransformationMeta(
-                                    
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
-                                    "WatermarkTimestampAssigner",
-                                    "WatermarkTimestampAssigner",
-                                    config),
-                            new WatermarkTimestampAssigner(),
-                            inputTransform.getOutputType(),
-                            inputTransform.getParallelism(),
-                            false);
-        }
+        final String[] fieldNames = 
physicalRowType.getFieldNames().toArray(new String[0]);
+        final List<String> pkFieldNames =
+                Arrays.stream(primaryKeys)
+                        .mapToObj(idx -> fieldNames[idx])
+                        .collect(Collectors.toList());
 
         OneInputTransformation<RowData, RowData> materializeTransform =
                 ExecNodeUtil.createOneInputTransformation(
-                        transformForMaterializer,
+                        inputTransform,
                         createTransformationMeta(
                                 UPSERT_MATERIALIZE_TRANSFORMATION,
                                 String.format(
@@ -400,8 +383,6 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
     private OneInputStreamOperator<RowData, RowData> createSumOperator(
             ExecNodeConfig config,
             RowType physicalRowType,
-            int[] primaryKeys,
-            String[] pkFieldNames,
             int[] inputUpsertKey,
             GeneratedRecordEqualiser upsertKeyEqualiser,
             GeneratedHashFunction upsertKeyHashFunction,
@@ -409,21 +390,6 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
             GeneratedRecordEqualiser rowEqualiser,
             GeneratedHashFunction rowHashFunction) {
 
-        // Check if we should use the watermark-compacting materializer for 
ERROR/NOTHING strategies
-        if (isErrorOrNothingConflictStrategy()) {
-            RowType keyType = RowTypeUtils.projectRowType(physicalRowType, 
primaryKeys);
-
-            return WatermarkCompactingSinkMaterializer.create(
-                    conflictStrategy,
-                    physicalRowType,
-                    rowEqualiser,
-                    upsertKeyEqualiser,
-                    inputUpsertKey,
-                    keyType,
-                    pkFieldNames);
-        }
-
-        // Use existing logic for DEDUPLICATE (legacy behavior)
         SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
                 Optional.ofNullable(upsertMaterializeStrategy)
                         .orElse(SinkUpsertMaterializeStrategy.LEGACY);
@@ -449,12 +415,6 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                                 config));
     }
 
-    private boolean isErrorOrNothingConflictStrategy() {
-        return conflictStrategy != null
-                && (conflictStrategy.getBehavior() == ConflictBehavior.ERROR
-                        || conflictStrategy.getBehavior() == 
ConflictBehavior.NOTHING);
-    }
-
     private static SequencedMultiSetStateConfig createStateConfig(
             SinkUpsertMaterializeStrategy strategy,
             TimeDomain ttlTimeDomain,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index cf2a7d53c9e..c77d8312d52 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -18,7 +18,6 @@
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
 import org.apache.flink.table.api.InsertConflictStrategy
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.{SinkUpsertMaterializeStrategy,
 TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY}
 import org.apache.flink.table.catalog.ContextResolvedTable
 import org.apache.flink.table.connector.sink.DynamicTableSink
@@ -35,12 +34,9 @@ import 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
 
-import scala.collection.JavaConversions._
-
 /**
  * Stream physical RelNode to write data into an external sink defined by a 
[[DynamicTableSink]].
  */
@@ -138,35 +134,4 @@ class StreamPhysicalSink(
       .itemIf("upsertMaterialize", "true", upsertMaterialize)
       .itemIf("conflictStrategy", conflictStrategy, conflictStrategy != null)
   }
-
-  def isDeduplicateConflictStrategy: Boolean = {
-    conflictStrategy != null && conflictStrategy.getBehavior == 
ConflictBehavior.DEDUPLICATE
-  }
-
-  def primaryKeysContainsUpsertKey: Boolean = {
-    val primaryKeys = 
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
-    val pks = ImmutableBitSet.of(primaryKeys: _*)
-    val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
-    val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
-    changeLogUpsertKeys != null && changeLogUpsertKeys.exists(pks.contains)
-  }
-
-  def getUpsertKeyNames: String = {
-    val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
-    val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
-    if (changeLogUpsertKeys == null) {
-      "none"
-    } else {
-      val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
-      changeLogUpsertKeys
-        .map(uk => uk.toArray.map(fieldNames.get).mkString("[", ", ", "]"))
-        .mkString(", ")
-    }
-  }
-
-  def getPrimaryKeyNames: String = {
-    val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
-    val primaryKeys = 
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
-    primaryKeys.map(fieldNames.get).mkString("[", ", ", "]")
-  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 9cc84db2b7b..baa9b25db02 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -1060,30 +1060,38 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
         case UpsertMaterialize.NONE => false
         case UpsertMaterialize.AUTO =>
-          // if the sink is not an UPSERT sink (has no PK, or is an APPEND or 
RETRACT sink)
-          // we don't need to materialize results
-          if (primaryKeys.isEmpty || sinkIsAppend || sinkIsRetract) {
+          if (
+            (inputIsAppend && InsertConflictStrategy
+              .deduplicate()
+              .equals(sink.conflictStrategy)) || sinkIsAppend || sinkIsRetract
+          ) {
             return false
           }
-
-          // For a DEDUPLICATE strategy and INSERT only input, we simply let 
the inserts be handled
-          // as UPSERT_AFTER and overwrite previous value
-          if (inputIsAppend && sink.isDeduplicateConflictStrategy) {
+          if (primaryKeys.isEmpty) {
             return false
           }
-
-          // if input has updates and primary key != upsert key  we should 
enable upsertMaterialize.
-          //
-          // An optimize is: do not enable upsertMaterialize when sink pk(s) 
contains input
-          // changeLogUpsertKeys
-          val upsertKeyDiffersFromPk = !sink.primaryKeysContainsUpsertKey
+          val pks = ImmutableBitSet.of(primaryKeys: _*)
+          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input has updates and primary key != upsert key (upsert key 
can be null) we should
+          // enable upsertMaterialize. An optimize is: do not enable 
upsertMaterialize when sink
+          // pk(s) contains input changeLogUpsertKeys
+          val upsertKeyDiffersFromPk =
+            changeLogUpsertKeys == null || 
!changeLogUpsertKeys.exists(pks.contains)
 
           // Validate that ON CONFLICT is specified when upsert key differs 
from primary key
           val requireOnConflict =
             
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT)
           if (requireOnConflict && upsertKeyDiffersFromPk && 
sink.conflictStrategy == null) {
-            val pkNames = sink.getPrimaryKeyNames
-            val upsertKeyNames = sink.getUpsertKeyNames
+            val fieldNames = 
sink.contextResolvedTable.getResolvedSchema.getColumnNames
+            val pkNames = primaryKeys.map(fieldNames.get(_)).mkString("[", ", 
", "]")
+            val upsertKeyNames = if (changeLogUpsertKeys == null) {
+              "none"
+            } else {
+              changeLogUpsertKeys
+                .map(uk => uk.toArray.map(fieldNames.get(_)).mkString("[", ", 
", "]"))
+                .mkString(", ")
+            }
             throw new ValidationException(
               "The query has an upsert key that differs from the primary key 
of the sink table " +
                 
s"'${sink.contextResolvedTable.getIdentifier.asSummaryString}'. " +
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
index 6dba8353f02..7eaa6c06c7f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
@@ -18,48 +18,21 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
-import org.apache.flink.table.api.TableEnvironment;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
-import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
-import org.apache.flink.table.test.program.TestStep;
-import org.apache.flink.table.test.program.TestStep.TestKind;
 
-import java.util.EnumSet;
 import java.util.List;
-import java.util.Map;
 
 /** Semantic tests for {@link StreamExecSink}. */
 public class SinkSemanticTests extends SemanticTestBase {
 
-    @Override
-    public EnumSet<TestKind> supportedSetupSteps() {
-        EnumSet<TestKind> steps = super.supportedSetupSteps();
-        steps.add(TestKind.SINK_WITHOUT_DATA);
-        return steps;
-    }
-
-    @Override
-    protected void runStep(TestStep testStep, TableEnvironment env) throws 
Exception {
-        if (testStep.getKind() == TestKind.SINK_WITHOUT_DATA) {
-            final SinkTestStep sinkTestStep = (SinkTestStep) testStep;
-            sinkTestStep.apply(
-                    env,
-                    Map.ofEntries(
-                            Map.entry("connector", "values"),
-                            Map.entry("sink-insert-only", "false")));
-        } else {
-            super.runStep(testStep, env);
-        }
-    }
-
     @Override
     public List<TableTestProgram> programs() {
         return List.of(
                 SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK,
                 SinkTestPrograms.INSERT_RETRACT_WITH_PK,
-                SinkTestPrograms.ON_CONFLICT_DO_NOTHING_KEEPS_FIRST,
-                SinkTestPrograms.ON_CONFLICT_DO_ERROR_NO_CONFLICT,
+                SinkTestPrograms.ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED,
+                SinkTestPrograms.ON_CONFLICT_DO_ERROR_NOT_SUPPORTED,
                 
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT,
                 SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITH_ON_CONFLICT,
                 SinkTestPrograms.UPSERT_KEY_MATCHES_PK_WITHOUT_ON_CONFLICT,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
index 6ddc55cc18b..c59fb88521d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
@@ -89,47 +89,46 @@ public class SinkTestPrograms {
                             "INSERT INTO sink_t SELECT UPPER(name), SUM(score) 
FROM source_t GROUP BY name")
                     .build();
 
-    // --- ON CONFLICT tests ---
+    // --- ON CONFLICT validation tests ---
 
-    public static final TableTestProgram ON_CONFLICT_DO_NOTHING_KEEPS_FIRST =
+    public static final TableTestProgram ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED =
             TableTestProgram.of(
-                            "sink-on-conflict-do-nothing-keeps-first",
-                            "ON CONFLICT DO NOTHING keeps the first record 
when multiple records have the same PK.")
+                            "sink-on-conflict-do-nothing-not-supported",
+                            "ON CONFLICT DO NOTHING is not yet supported and 
should throw ValidationException.")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
                                     .addSchema("a INT", "b BIGINT")
                                     .addOption("changelog-mode", "I")
-                                    .producedValues(
-                                            Row.ofKind(RowKind.INSERT, 1, 10L),
-                                            Row.ofKind(RowKind.INSERT, 1, 20L),
-                                            Row.ofKind(RowKind.INSERT, 2, 30L))
+                                    .producedValues(Row.ofKind(RowKind.INSERT, 
1, 1L))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink_t")
                                     .addSchema("a INT PRIMARY KEY NOT 
ENFORCED", "b BIGINT")
-                                    .consumedValues("+I[1, 10]", "+I[2, 30]")
                                     .build())
-                    .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO NOTHING")
+                    .runFailingSql(
+                            "INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO NOTHING",
+                            ValidationException.class,
+                            "ON CONFLICT DO NOTHING is not yet supported")
                     .build();
 
-    public static final TableTestProgram ON_CONFLICT_DO_ERROR_NO_CONFLICT =
+    public static final TableTestProgram ON_CONFLICT_DO_ERROR_NOT_SUPPORTED =
             TableTestProgram.of(
-                            "sink-on-conflict-do-error-no-conflict",
-                            "ON CONFLICT DO ERROR with no conflicts passes 
through all records.")
+                            "sink-on-conflict-do-error-not-supported",
+                            "ON CONFLICT DO ERROR is not yet supported and 
should throw ValidationException.")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
                                     .addSchema("a INT", "b BIGINT")
                                     .addOption("changelog-mode", "I")
-                                    .producedValues(
-                                            Row.ofKind(RowKind.INSERT, 1, 10L),
-                                            Row.ofKind(RowKind.INSERT, 2, 20L))
+                                    .producedValues(Row.ofKind(RowKind.INSERT, 
1, 1L))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink_t")
                                     .addSchema("a INT PRIMARY KEY NOT 
ENFORCED", "b BIGINT")
-                                    .consumedValues("+I[1, 10]", "+I[2, 20]")
                                     .build())
-                    .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO ERROR")
+                    .runFailingSql(
+                            "INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO ERROR",
+                            ValidationException.class,
+                            "ON CONFLICT DO ERROR is not yet supported")
                     .build();
 
     public static final TableTestProgram 
UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT =
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
deleted file mode 100644
index 4a31a1d2da1..00000000000
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.MathUtils;
-
-import java.io.IOException;
-
-/**
- * A serializer for {@code Long} that produces a lexicographically sortable 
byte representation.
- *
- * <p>Standard big-endian long serialization does not maintain numeric 
ordering for negative values
- * in lexicographic byte comparison because negative numbers have their sign 
bit set (1), which
- * makes them appear greater than positive numbers (sign bit 0) in unsigned 
byte comparison.
- *
- * <p>This serializer flips the sign bit during serialization, converting from 
signed to unsigned
- * ordering. This ensures that when iterating over keys in a sorted state 
backend (like RocksDB),
- * the entries are returned in numeric order.
- *
- * @see org.apache.flink.streaming.api.operators.TimerSerializer
- */
-@Internal
-public final class SortedLongSerializer extends TypeSerializerSingleton<Long> {
-
-    private static final long serialVersionUID = 1L;
-
-    /** Sharable instance of the SortedLongSerializer. */
-    public static final SortedLongSerializer INSTANCE = new 
SortedLongSerializer();
-
-    private static final Long ZERO = 0L;
-
-    @Override
-    public boolean isImmutableType() {
-        return true;
-    }
-
-    @Override
-    public Long createInstance() {
-        return ZERO;
-    }
-
-    @Override
-    public Long copy(Long from) {
-        return from;
-    }
-
-    @Override
-    public Long copy(Long from, Long reuse) {
-        return from;
-    }
-
-    @Override
-    public int getLength() {
-        return Long.BYTES;
-    }
-
-    @Override
-    public void serialize(Long record, DataOutputView target) throws 
IOException {
-        target.writeLong(MathUtils.flipSignBit(record));
-    }
-
-    @Override
-    public Long deserialize(DataInputView source) throws IOException {
-        return MathUtils.flipSignBit(source.readLong());
-    }
-
-    @Override
-    public Long deserialize(Long reuse, DataInputView source) throws 
IOException {
-        return deserialize(source);
-    }
-
-    @Override
-    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-        target.writeLong(source.readLong());
-    }
-
-    @Override
-    public TypeSerializerSnapshot<Long> snapshotConfiguration() {
-        return new SortedLongSerializerSnapshot();
-    }
-
-    // ------------------------------------------------------------------------
-
-    /** Serializer configuration snapshot for compatibility and format 
evolution. */
-    @SuppressWarnings("WeakerAccess")
-    public static final class SortedLongSerializerSnapshot
-            extends SimpleTypeSerializerSnapshot<Long> {
-
-        public SortedLongSerializerSnapshot() {
-            super(() -> INSTANCE);
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
deleted file mode 100644
index fc975bda184..00000000000
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.api.InsertConflictStrategy;
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
-import org.apache.flink.table.api.TableRuntimeException;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.utils.ProjectedRowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
-import org.apache.flink.table.runtime.generated.RecordEqualiser;
-import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.types.RowKind.DELETE;
-import static org.apache.flink.types.RowKind.INSERT;
-import static org.apache.flink.types.RowKind.UPDATE_AFTER;
-
-/**
- * A sink materializer that buffers records and compacts them on watermark 
progression.
- *
- * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
- * changelog disorder when the upsert key differs from the sink's primary key.
- */
-public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
-        implements OneInputStreamOperator<RowData, RowData> {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOG =
-            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
-
-    private static final String STATE_CLEARED_WARN_MSG =
-            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
-                    + "You can increase the state TTL to avoid this.";
-    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
-
-    private final InsertConflictStrategy conflictStrategy;
-    private final TypeSerializer<RowData> serializer;
-    private final GeneratedRecordEqualiser generatedRecordEqualiser;
-    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
-    private final int[] inputUpsertKey;
-    private final boolean hasUpsertKey;
-    private final RowType keyType;
-    private final String[] primaryKeyNames;
-
-    private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor;
-    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
-    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
-    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
-    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
-    // arrive before the UPDATE_BEFORE of a previous change.
-    private transient MapState<Long, List<RowData>> buffer;
-
-    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
-    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
-    private transient ValueState<RowData> currentValue;
-    private transient RecordEqualiser equaliser;
-    private transient RecordEqualiser upsertKeyEqualiser;
-    private transient TimestampedCollector<RowData> collector;
-    private transient boolean isOrderedStateBackend;
-
-    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
-    private transient ProjectedRowData upsertKeyProjectedRow1;
-    private transient ProjectedRowData upsertKeyProjectedRow2;
-
-    // Field getters for formatting the primary key in error messages.
-    private transient RowData.FieldGetter[] keyFieldGetters;
-
-    // Tracks the current watermark. Used to detect in-flight records after 
restore.
-    private transient long currentWatermark = Long.MIN_VALUE;
-
-    public WatermarkCompactingSinkMaterializer(
-            InsertConflictStrategy conflictStrategy,
-            TypeSerializer<RowData> serializer,
-            GeneratedRecordEqualiser generatedRecordEqualiser,
-            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
-            @Nullable int[] inputUpsertKey,
-            RowType keyType,
-            String[] primaryKeyNames) {
-        validateConflictStrategy(conflictStrategy);
-        this.conflictStrategy = conflictStrategy;
-        this.serializer = serializer;
-        this.generatedRecordEqualiser = generatedRecordEqualiser;
-        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
-        this.inputUpsertKey = inputUpsertKey;
-        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
-        this.keyType = keyType;
-        this.primaryKeyNames = primaryKeyNames;
-    }
-
-    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
-        Preconditions.checkArgument(
-                strategy.getBehavior() == ConflictBehavior.ERROR
-                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
-                "Only ERROR and NOTHING strategies are supported, got: %s",
-                strategy);
-    }
-
-    @Override
-    public void initializeState(StateInitializationContext context) throws 
Exception {
-        super.initializeState(context);
-
-        // Initialize state descriptors and handles
-        this.bufferDescriptor =
-                new MapStateDescriptor<>(
-                        "watermark-buffer",
-                        SortedLongSerializer.INSTANCE,
-                        new ListSerializer<>(serializer));
-        this.buffer = 
context.getKeyedStateStore().getMapState(bufferDescriptor);
-
-        ValueStateDescriptor<RowData> currentValueDescriptor =
-                new ValueStateDescriptor<>("current-value", serializer);
-        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
-
-        if (context.isRestored()) {
-            // Detect ordered state backend before consolidation
-            detectOrderedStateBackend();
-
-            // Consolidate all buffered records to MIN_VALUE for each key.
-            // This ensures they are compacted on the first watermark after 
restore.
-            getKeyedStateBackend()
-                    .applyToAllKeys(
-                            VoidNamespace.INSTANCE,
-                            VoidNamespaceSerializer.INSTANCE,
-                            bufferDescriptor,
-                            (key, state) -> consolidateBufferToMinValue());
-        }
-    }
-
-    private void consolidateBufferToMinValue() throws Exception {
-        List<RowData> consolidated = new ArrayList<>();
-
-        if (isOrderedStateBackend) {
-            // RocksDB/ForSt: entries are already sorted by timestamp
-            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
-            while (iterator.hasNext()) {
-                consolidated.addAll(iterator.next().getValue());
-                iterator.remove();
-            }
-        } else {
-            // Other backends: collect, sort by timestamp, then consolidate
-            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
-            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
-            while (iterator.hasNext()) {
-                entries.add(iterator.next());
-                iterator.remove();
-            }
-
-            entries.sort(Map.Entry.comparingByKey());
-
-            for (Map.Entry<Long, List<RowData>> entry : entries) {
-                consolidated.addAll(entry.getValue());
-            }
-        }
-
-        if (!consolidated.isEmpty()) {
-            buffer.put(Long.MIN_VALUE, consolidated);
-        }
-    }
-
-    @Override
-    public void open() throws Exception {
-        super.open();
-        initializeEqualisers();
-        initializeKeyFieldGetters();
-        detectOrderedStateBackend();
-        this.collector = new TimestampedCollector<>(output);
-    }
-
-    private void initializeKeyFieldGetters() {
-        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
-        for (int i = 0; i < primaryKeyNames.length; i++) {
-            LogicalType fieldType = keyType.getTypeAt(i);
-            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
-        }
-    }
-
-    private void initializeEqualisers() {
-        if (hasUpsertKey) {
-            this.upsertKeyEqualiser =
-                    generatedUpsertKeyEqualiser.newInstance(
-                            getRuntimeContext().getUserCodeClassLoader());
-            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
-            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
-        }
-        this.equaliser =
-                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
-    }
-
-    private void detectOrderedStateBackend() {
-        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
-        String backendType =
-                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
-        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
-
-        if (isOrderedStateBackend) {
-            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
-        }
-    }
-
-    @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception 
{
-        RowData row = element.getValue();
-        long assignedTimestamp = element.getTimestamp();
-
-        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
-        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
-        // checkpointed before restore. Assign to MIN_VALUE.
-        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
-            assignedTimestamp = Long.MIN_VALUE;
-        }
-
-        bufferRecord(assignedTimestamp, row);
-    }
-
-    private void bufferRecord(long timestamp, RowData row) throws Exception {
-        List<RowData> records = buffer.get(timestamp);
-        if (records == null) {
-            records = new ArrayList<>();
-        }
-        switch (row.getRowKind()) {
-            case INSERT:
-            case UPDATE_AFTER:
-                // Try to cancel out a pending retraction; if none, just append
-                if (!tryCancelRetraction(records, row)) {
-                    records.add(row);
-                }
-                break;
-            case UPDATE_BEFORE:
-            case DELETE:
-                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
-                if (!tryCancelAddition(records, row)) {
-                    records.add(row);
-                }
-                break;
-        }
-        buffer.put(timestamp, records);
-    }
-
-    @Override
-    public void processWatermark(Watermark mark) throws Exception {
-        final long watermarkTimestamp = mark.getTimestamp();
-        this.currentWatermark = watermarkTimestamp;
-
-        // Iterate over all keys and compact their buffered records
-        this.<RowData>getKeyedStateBackend()
-                .applyToAllKeys(
-                        VoidNamespace.INSTANCE,
-                        VoidNamespaceSerializer.INSTANCE,
-                        bufferDescriptor,
-                        (key, state) -> compactAndEmit(watermarkTimestamp));
-
-        super.processWatermark(mark);
-    }
-
-    private void compactAndEmit(long newWatermark) throws Exception {
-        RowData previousValue = currentValue.value();
-        List<RowData> pendingRecords = collectPendingRecords(previousValue, 
newWatermark);
-
-        if (pendingRecords.size() > 1) {
-            if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
-                RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
-                throw new TableRuntimeException(
-                        "Primary key constraint violation: multiple distinct 
records with "
-                                + "the same primary key detected. Conflicting 
key: ["
-                                + formatKey(key)
-                                + "]. Use ON CONFLICT DO NOTHING to keep the 
first record.");
-            } else if (previousValue == null) {
-                final RowData newValue = pendingRecords.get(0);
-                emit(newValue, INSERT);
-                currentValue.update(newValue);
-            }
-        } else if (pendingRecords.isEmpty()) {
-            if (previousValue != null) {
-                emit(previousValue, DELETE);
-                currentValue.clear();
-            }
-        } else {
-            final RowData newValue = pendingRecords.get(0);
-            if (previousValue == null) {
-                emit(newValue, INSERT);
-                currentValue.update(newValue);
-            } else if (!recordEquals(previousValue, newValue)) {
-                emit(newValue, UPDATE_AFTER);
-                currentValue.update(newValue);
-            }
-        }
-    }
-
-    private List<RowData> collectPendingRecords(RowData previousValue, long 
newWatermark)
-            throws Exception {
-        List<RowData> records = new ArrayList<>();
-        if (previousValue != null) {
-            records.add(previousValue);
-        }
-        Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
-
-        while (iterator.hasNext()) {
-            Map.Entry<Long, List<RowData>> entry = iterator.next();
-            if (entry.getKey() <= newWatermark) {
-                for (RowData pendingRecord : entry.getValue()) {
-                    switch (pendingRecord.getRowKind()) {
-                        case INSERT:
-                        case UPDATE_AFTER:
-                            addRow(records, pendingRecord);
-                            break;
-
-                        case UPDATE_BEFORE:
-                        case DELETE:
-                            retractRow(records, pendingRecord);
-                            break;
-                    }
-                }
-                iterator.remove();
-            } else if (isOrderedStateBackend) {
-                break;
-            }
-        }
-        return records;
-    }
-
-    private void addRow(List<RowData> values, RowData add) {
-        if (hasUpsertKey) {
-            int index = findFirst(values, add);
-            if (index == -1) {
-                values.add(add);
-            } else {
-                values.set(index, add);
-            }
-        } else {
-            values.add(add);
-        }
-    }
-
-    private void retractRow(List<RowData> values, RowData retract) {
-        final int index = findFirst(values, retract);
-        if (index == -1) {
-            LOG.info(STATE_CLEARED_WARN_MSG);
-        } else {
-            // Remove first found row
-            values.remove(index);
-        }
-    }
-
-    /**
-     * Attempts to cancel out a retraction by finding a matching retractive 
record
-     * (DELETE/UPDATE_BEFORE) with identical content.
-     *
-     * @return true if a matching retraction was found and removed, false 
otherwise
-     */
-    private boolean tryCancelRetraction(List<RowData> values, RowData 
addition) {
-        final Iterator<RowData> iterator = values.iterator();
-        while (iterator.hasNext()) {
-            RowData candidate = iterator.next();
-            RowKind kind = candidate.getRowKind();
-            if ((kind == DELETE || kind == RowKind.UPDATE_BEFORE)
-                    && recordEquals(addition, candidate)) {
-                iterator.remove();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    /**
-     * Attempts to cancel out an addition by finding a matching additive record
-     * (INSERT/UPDATE_AFTER) with identical content.
-     *
-     * @return true if a matching addition was found and removed, false 
otherwise
-     */
-    private boolean tryCancelAddition(List<RowData> values, RowData 
retraction) {
-        final Iterator<RowData> iterator = values.iterator();
-        while (iterator.hasNext()) {
-            RowData candidate = iterator.next();
-            RowKind kind = candidate.getRowKind();
-            if ((kind == INSERT || kind == UPDATE_AFTER) && 
recordEquals(retraction, candidate)) {
-                iterator.remove();
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private int findFirst(List<RowData> values, RowData target) {
-        final Iterator<RowData> iterator = values.iterator();
-        int i = 0;
-        while (iterator.hasNext()) {
-            if (equalsIgnoreRowKind(target, iterator.next())) {
-                return i;
-            }
-            i++;
-        }
-        return -1;
-    }
-
-    private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
-        newRow.setRowKind(oldRow.getRowKind());
-        if (hasUpsertKey) {
-            return this.upsertKeyEqualiser.equals(
-                    upsertKeyProjectedRow1.replaceRow(newRow),
-                    upsertKeyProjectedRow2.replaceRow(oldRow));
-        }
-        return equaliser.equals(newRow, oldRow);
-    }
-
-    private void emit(RowData row, RowKind kind) {
-        RowKind originalKind = row.getRowKind();
-        row.setRowKind(kind);
-        collector.collect(row);
-        row.setRowKind(originalKind);
-    }
-
-    private boolean recordEquals(RowData row1, RowData row2) {
-        RowKind kind1 = row1.getRowKind();
-        RowKind kind2 = row2.getRowKind();
-        row1.setRowKind(RowKind.INSERT);
-        row2.setRowKind(RowKind.INSERT);
-        boolean result = equaliser.equals(row1, row2);
-        row1.setRowKind(kind1);
-        row2.setRowKind(kind2);
-        return result;
-    }
-
-    private String formatKey(RowData key) {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < primaryKeyNames.length; i++) {
-            if (i > 0) {
-                sb.append(", ");
-            }
-            sb.append(primaryKeyNames[i]).append("=");
-            if (key.isNullAt(i)) {
-                sb.append("null");
-            } else {
-                sb.append(keyFieldGetters[i].getFieldOrNull(key));
-            }
-        }
-        return sb.toString();
-    }
-
-    /** Factory method to create a new instance. */
-    public static WatermarkCompactingSinkMaterializer create(
-            InsertConflictStrategy conflictStrategy,
-            RowType physicalRowType,
-            GeneratedRecordEqualiser rowEqualiser,
-            @Nullable GeneratedRecordEqualiser upsertKeyEqualiser,
-            @Nullable int[] inputUpsertKey,
-            RowType keyType,
-            String[] primaryKeyNames) {
-        return new WatermarkCompactingSinkMaterializer(
-                conflictStrategy,
-                InternalSerializers.create(physicalRowType),
-                rowEqualiser,
-                upsertKeyEqualiser,
-                inputUpsertKey,
-                keyType,
-                primaryKeyNames);
-    }
-}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
deleted file mode 100644
index 17598df521d..00000000000
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.operators.TableStreamOperator;
-
-/**
- * Operator that assigns the current watermark as the timestamp to each 
incoming StreamRecord.
- *
- * <p>This is used in conjunction with {@link 
WatermarkCompactingSinkMaterializer} which buffers
- * records by their timestamp. Without meaningful timestamps, all records 
would be buffered under
- * the same key, breaking the watermark-based compaction logic.
- *
- * <p>If the current watermark is {@code Long.MIN_VALUE} (the initial state 
before any watermark
- * arrives), records will be assigned that value and will be compacted when 
the first watermark
- * arrives.
- */
-@Internal
-public class WatermarkTimestampAssigner extends TableStreamOperator<RowData>
-        implements OneInputStreamOperator<RowData, RowData> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception 
{
-        element.setTimestamp(currentWatermark);
-        output.collect(element);
-    }
-}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
deleted file mode 100644
index d331eb18c94..00000000000
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.Random;
-
-/** A test for the {@link SortedLongSerializer}. */
-class SortedLongSerializerTest extends SerializerTestBase<Long> {
-
-    @Override
-    protected TypeSerializer<Long> createSerializer() {
-        return SortedLongSerializer.INSTANCE;
-    }
-
-    @Override
-    protected int getLength() {
-        return 8;
-    }
-
-    @Override
-    protected Class<Long> getTypeClass() {
-        return Long.class;
-    }
-
-    @Override
-    protected Long[] getTestData() {
-        Random rnd = new Random(874597969123412341L);
-        long rndLong = rnd.nextLong();
-
-        return new Long[] {0L, 1L, -1L, Long.MAX_VALUE, Long.MIN_VALUE, 
rndLong, -rndLong};
-    }
-}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
deleted file mode 100644
index adb701a58e9..00000000000
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
+++ /dev/null
@@ -1,769 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.table.api.InsertConflictStrategy;
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
-import org.apache.flink.table.api.TableRuntimeException;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
-import org.apache.flink.table.runtime.generated.RecordEqualiser;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.utils.HandwrittenSelectorUtil;
-import org.apache.flink.types.RowKind;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete;
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
-import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert;
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
-import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter;
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
-import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
-class WatermarkCompactingSinkMaterializerTest {
-
-    private static final int PRIMARY_KEY_INDEX = 1;
-    private static final String PRIMARY_KEY_NAME = "pk";
-
-    private static final LogicalType[] LOGICAL_TYPES =
-            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
-
-    private static final GeneratedRecordEqualiser RECORD_EQUALISER =
-            new GeneratedRecordEqualiser("", "", new Object[0]) {
-                @Override
-                public RecordEqualiser newInstance(ClassLoader classLoader) {
-                    return new TestRecordEqualiser();
-                }
-            };
-
-    private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
-            new GeneratedRecordEqualiser("", "", new Object[0]) {
-                @Override
-                public RecordEqualiser newInstance(ClassLoader classLoader) {
-                    return new TestUpsertKeyEqualiser();
-                }
-            };
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) 
throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-
-            // Insert first record (watermark is MIN_VALUE)
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            assertEmitsNothing(harness); // Buffered, waiting for watermark
-
-            // Advance watermark to trigger compaction
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "a1"));
-
-            // Update with same upsert key (this is the expected pattern for 
single-source updates)
-            harness.processElement(updateAfterRecord(1L, 1, "a2"));
-            assertEmitsNothing(harness);
-
-            // Advance watermark again
-            harness.processWatermark(200L);
-            assertEmits(harness, updateAfter(1L, 1, "a2"));
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-
-            // Insert and compact
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "a1"));
-
-            // Delete and compact
-            harness.processElement(deleteRecord(1L, 1, "a1"));
-            harness.processWatermark(200L);
-            assertEmits(harness, delete(1L, 1, "a1"));
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws 
Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-
-            // Insert and delete before watermark advances
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            harness.processElement(deleteRecord(1L, 1, "a1"));
-
-            // Compact - should emit nothing since insert and delete cancel out
-            harness.processWatermark(100L);
-            assertEmitsNothing(harness);
-        }
-    }
-
-    @Test
-    void testDoNothingKeepsFirstRecord() throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(ConflictBehavior.NOTHING)) {
-            harness.open();
-
-            // Insert two records with different upsert keys but same primary 
key
-            harness.processElement(insertRecord(1L, 1, "first"));
-            harness.processElement(insertRecord(2L, 1, "second"));
-
-            // Compact - should keep the first record
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "first"));
-        }
-    }
-
-    @Test
-    void testDoErrorThrowsOnConflict() throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(ConflictBehavior.ERROR)) {
-            harness.open();
-
-            // Insert two records with different upsert keys but same primary 
key
-            harness.processElement(insertRecord(1L, 1, "first"));
-            harness.processElement(insertRecord(2L, 1, "second"));
-
-            // Compact - should throw exception with key info
-            assertThatThrownBy(() -> harness.processWatermark(100L))
-                    .isInstanceOf(TableRuntimeException.class)
-                    .hasMessageContaining("Primary key constraint violation")
-                    .hasMessageContaining("[pk=1]");
-        }
-    }
-
-    @Test
-    void testDoErrorAllowsSameUpsertKey() throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(ConflictBehavior.ERROR)) {
-            harness.open();
-
-            // Insert two records with same upsert key (updates to same source)
-            harness.processElement(insertRecord(1L, 1, "v1"));
-            harness.processElement(updateAfterRecord(1L, 1, "v2"));
-
-            // Compact - should not throw, just keep the latest
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "v2"));
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testChangelogDisorderHandling(ConflictBehavior behavior) throws 
Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-
-            // Simulate changelog disorder from FLIP-558 example:
-            // Records from different sources (different upsert keys: 1L and 
2L) map to same PK (1)
-            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
-            // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)
-
-            // The +U from source 2 arrives first (upsert key = 2L)
-            harness.processElement(updateAfterRecord(2L, 1, "b1"));
-            // Then +I and -U from source 1 arrive (upsert key = 1L)
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            harness.processElement(updateBeforeRecord(1L, 1, "a1"));
-
-            // Net result: only (2L, 1, "b1") remains after cancellation, no 
conflict
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(2L, 1, "b1"));
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testChangelogDisorderUpdateBeforeAfterInsert(ConflictBehavior 
behavior) throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-
-            // Simulate changelog disorder where UPDATE_AFTER arrives before 
UPDATE_BEFORE:
-            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(1,1,a2)
-            // Disordered: +I(1,1,a1), +U(1,1,a2), -U(1,1,a1)
-
-            // INSERT the initial value
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            // UPDATE_AFTER arrives before the UPDATE_BEFORE
-            harness.processElement(updateAfterRecord(1L, 1, "a2"));
-            // UPDATE_BEFORE (retraction of the original INSERT) arrives last
-            harness.processElement(updateBeforeRecord(1L, 1, "a1"));
-
-            // Net result: +I and -U cancel out, only +U(1,1,a2) remains
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "a2"));
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws 
Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-
-            // Insert and compact
-            harness.processElement(insertRecord(1L, 1, "value"));
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "value"));
-
-            // Insert same value again (same upsert key)
-            harness.processElement(updateAfterRecord(1L, 1, "value"));
-            harness.processWatermark(200L);
-            // Should not emit since value is the same
-            assertEmitsNothing(harness);
-        }
-    }
-
-    /**
-     * Tests that record timestamps are handled correctly when multiple inputs 
send records that
-     * arrive out of timestamp order. This simulates the case where records 
from different upstream
-     * tasks have different timestamps and arrive interleaved.
-     *
-     * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All 
records have same primary
-     * key (1).
-     *
-     * <p>Sequence:
-     *
-     * <ol>
-     *   <li>INSERT(input=1, t=2)
-     *   <li>watermark=3 -> emits INSERT
-     *   <li>UPDATE_BEFORE(input=1, t=4)
-     *   <li>UPDATE_AFTER(input=1, t=6)
-     *   <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has 
smaller timestamp
-     *   <li>watermark=5 -> compacts t<=5 records
-     *   <li>UPDATE_BEFORE(input=2, t=6)
-     *   <li>watermark=10 -> compacts remaining t=6 records
-     * </ol>
-     */
-    @Test
-    void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(ConflictBehavior.NOTHING)) {
-            harness.open();
-
-            // INSERT from input 1 with timestamp 2
-            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"v1", 2L));
-            assertEmitsNothing(harness);
-
-            // watermark=3: compacts records with t<=3, emits INSERT(t=2)
-            harness.processWatermark(3L);
-            assertEmits(harness, insert(1L, 1, "v1"));
-
-            // UPDATE_BEFORE from input 1 with timestamp 4
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 
1L, 1, "v1", 4L));
-            assertEmitsNothing(harness);
-
-            // UPDATE_AFTER from input 1 with timestamp 6
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v4", 6L));
-            assertEmitsNothing(harness);
-
-            // UPDATE_AFTER from input 2 with timestamp 4
-            // This record arrives after the t=6 record but has a smaller 
timestamp
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
2L, 1, "v3", 4L));
-
-            // watermark=5: compacts records with t<=5
-            // Buffered: UPDATE_BEFORE(1L, t=4) and UPDATE_AFTER(2L, t=4) 
cancel out for input 1,
-            // UPDATE_AFTER(2L, t=4) is emitted
-            harness.processWatermark(5L);
-            assertEmits(harness, updateAfter(2L, 1, "v3"));
-
-            // UPDATE_BEFORE from input 2 with timestamp 6
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 
2L, 1, "v3", 6L));
-            assertEmitsNothing(harness);
-
-            // Final watermark to flush all remaining buffered records (t=6)
-            // Buffered: UPDATE_AFTER(1L, t=6) and UPDATE_BEFORE(2L, t=6)
-            // After compaction: UPDATE_AFTER(1L, "v4") remains as final value
-            harness.processWatermark(10L);
-            assertEmits(harness, updateAfter(1L, 1, "v4"));
-        }
-    }
-
-    // --- Tests without upsert key ---
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testBasicInsertWithoutUpsertKey(ConflictBehavior behavior) throws 
Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(behavior)) {
-            harness.open();
-
-            // Insert first record
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            assertEmitsNothing(harness);
-
-            // Advance watermark to trigger compaction
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "a1"));
-        }
-    }
-
-    @Test
-    void testUpdateWithoutUpsertKeyNothingStrategy() throws Exception {
-        // Without upsert key, UPDATE_AFTER on existing value causes conflict 
(two rows accumulate)
-        // NOTHING strategy keeps the first value
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
-            harness.open();
-
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "a1"));
-
-            // UPDATE_AFTER with different value - creates conflict, NOTHING 
keeps first
-            harness.processElement(updateAfterRecord(2L, 1, "a2"));
-            harness.processWatermark(200L);
-            // NOTHING keeps the previous value (1L, 1, "a1"), no emission 
since unchanged
-            assertEmitsNothing(harness);
-        }
-    }
-
-    @Test
-    void testUpdateWithoutUpsertKeyErrorStrategy() throws Exception {
-        // Without upsert key, UPDATE_AFTER on existing value causes conflict 
(two rows accumulate)
-        // ERROR strategy throws
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
-            harness.open();
-
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "a1"));
-
-            // UPDATE_AFTER with different value - creates conflict, ERROR 
throws with key info
-            harness.processElement(updateAfterRecord(2L, 1, "a2"));
-            assertThatThrownBy(() -> harness.processWatermark(200L))
-                    .isInstanceOf(TableRuntimeException.class)
-                    .hasMessageContaining("Primary key constraint violation")
-                    .hasMessageContaining("[pk=1]");
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testDeleteAfterInsertWithoutUpsertKey(ConflictBehavior behavior) 
throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(behavior)) {
-            harness.open();
-
-            // Insert and compact
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "a1"));
-
-            // Delete with exact same row and compact
-            harness.processElement(deleteRecord(1L, 1, "a1"));
-            harness.processWatermark(200L);
-            assertEmits(harness, delete(1L, 1, "a1"));
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testInsertAndDeleteInSameWindowWithoutUpsertKey(ConflictBehavior 
behavior)
-            throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(behavior)) {
-            harness.open();
-
-            // Insert and delete with exact same row before watermark advances
-            harness.processElement(insertRecord(1L, 1, "a1"));
-            harness.processElement(deleteRecord(1L, 1, "a1"));
-
-            // Compact - should emit nothing since insert and delete cancel out
-            harness.processWatermark(100L);
-            assertEmitsNothing(harness);
-        }
-    }
-
-    @Test
-    void testIdenticalInsertsWithoutUpsertKeyNothingKeepsFirst() throws 
Exception {
-        // Without upsert key, even identical inserts are separate entries
-        // NOTHING strategy just keeps the first one
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
-            harness.open();
-
-            // Insert two identical records (same full row)
-            harness.processElement(insertRecord(1L, 1, "same"));
-            harness.processElement(insertRecord(1L, 1, "same"));
-
-            // Compact - NOTHING keeps first record
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "same"));
-        }
-    }
-
-    @Test
-    void testIdenticalInsertsWithoutUpsertKeyErrorThrows() throws Exception {
-        // Without upsert key, even identical inserts are separate entries
-        // ERROR strategy throws because there are multiple records
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
-            harness.open();
-
-            // Insert two identical records (same full row)
-            harness.processElement(insertRecord(1L, 1, "same"));
-            harness.processElement(insertRecord(1L, 1, "same"));
-
-            // Compact - ERROR throws because there are multiple pending 
records, includes key info
-            assertThatThrownBy(() -> harness.processWatermark(100L))
-                    .isInstanceOf(TableRuntimeException.class)
-                    .hasMessageContaining("Primary key constraint violation")
-                    .hasMessageContaining("[pk=1]");
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testInsertUpdateDeleteWithoutUpsertKey(ConflictBehavior behavior) 
throws Exception {
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarnessWithoutUpsertKey(behavior)) {
-            harness.open();
-
-            // Insert, then update_before + update_after sequence
-            harness.processElement(insertRecord(1L, 1, "v1"));
-            harness.processWatermark(100L);
-            assertEmits(harness, insert(1L, 1, "v1"));
-
-            // Update: retract old value, insert new value
-            harness.processElement(updateBeforeRecord(1L, 1, "v1"));
-            harness.processElement(updateAfterRecord(2L, 1, "v2"));
-            harness.processWatermark(200L);
-            assertEmits(harness, updateAfter(2L, 1, "v2"));
-
-            // Delete the current value
-            harness.processElement(deleteRecord(2L, 1, "v2"));
-            harness.processWatermark(300L);
-            assertEmits(harness, delete(2L, 1, "v2"));
-        }
-    }
-
-    // --- Restore Tests ---
-
-    /**
-     * Tests that buffered records at different timestamps before checkpoint 
are consolidated to
-     * MIN_VALUE on restore and compacted on the first watermark.
-     */
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testBufferedRecordsConsolidatedOnRestore(ConflictBehavior behavior) 
throws Exception {
-        OperatorSubtaskState snapshot;
-
-        // First harness: buffer records at different timestamps, then take 
snapshot
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-
-            // Buffer records at different timestamps (simulating records 
before checkpoint)
-            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"v1", 1000L));
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v2", 2000L));
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v3", 3000L));
-
-            // No watermark yet, so nothing emitted
-            assertEmitsNothing(harness);
-
-            // Take snapshot with buffered records
-            snapshot = harness.snapshot(1L, 1L);
-        }
-
-        // Second harness: restore from snapshot
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.initializeState(snapshot);
-            harness.open();
-
-            // After restore, watermarks restart from MIN_VALUE.
-            // The buffered records should have been consolidated to MIN_VALUE.
-            // First watermark (even a small one) should trigger compaction of 
all consolidated
-            // records.
-            harness.processWatermark(100L);
-
-            // All records were from same upsert key, so only final value is 
emitted
-            assertEmits(harness, insert(1L, 1, "v3"));
-        }
-    }
-
-    /**
-     * Tests that in-flight records from unaligned checkpoints (records with 
timestamps > MIN_VALUE
-     * arriving before first watermark after restore) are correctly handled.
-     */
-    @ParameterizedTest
-    @EnumSource(
-            value = ConflictBehavior.class,
-            names = {"ERROR", "NOTHING"})
-    void testInFlightRecordsAfterRestore(ConflictBehavior behavior) throws 
Exception {
-        OperatorSubtaskState snapshot;
-
-        // First harness: take empty snapshot
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.open();
-            snapshot = harness.snapshot(1L, 1L);
-        }
-
-        // Second harness: restore and simulate in-flight records
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(behavior)) {
-            harness.initializeState(snapshot);
-            harness.open();
-
-            // Simulate in-flight records that were checkpointed with their 
old timestamps.
-            // These arrive after restore but before any watermark is received.
-            // They have timestamps > MIN_VALUE from before the checkpoint.
-            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"v1", 5000L));
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v2", 5100L));
-
-            // No watermark yet, nothing emitted
-            assertEmitsNothing(harness);
-
-            // First watermark after restore. Since currentWatermark was 
MIN_VALUE,
-            // in-flight records should have been assigned to MIN_VALUE and 
will be compacted.
-            harness.processWatermark(100L);
-
-            // Both records had same upsert key, so only final value is emitted
-            assertEmits(harness, insert(1L, 1, "v2"));
-        }
-    }
-
-    /**
-     * Tests restore with multiple keys having buffered records at different 
timestamps. Verifies
-     * that each key's records are correctly consolidated and compacted.
-     */
-    @Test
-    void testRestoreWithMultipleKeys() throws Exception {
-        OperatorSubtaskState snapshot;
-
-        // First harness: buffer records for multiple keys
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(ConflictBehavior.NOTHING)) {
-            harness.open();
-
-            // Key 1: multiple updates
-            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"k1v1", 1000L));
-            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "k1v2", 2000L));
-
-            // Key 2: single insert
-            harness.processElement(recordWithTimestamp(RowKind.INSERT, 2L, 2, 
"k2v1", 1500L));
-
-            // Key 3: insert then delete (should result in nothing)
-            harness.processElement(recordWithTimestamp(RowKind.INSERT, 3L, 3, 
"k3v1", 1000L));
-            harness.processElement(recordWithTimestamp(RowKind.DELETE, 3L, 3, 
"k3v1", 2500L));
-
-            snapshot = harness.snapshot(1L, 1L);
-        }
-
-        // Second harness: restore and verify
-        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
-                createHarness(ConflictBehavior.NOTHING)) {
-            harness.initializeState(snapshot);
-            harness.open();
-
-            // First watermark compacts all consolidated records
-            harness.processWatermark(100L);
-
-            // Extract and verify results (order depends on key processing 
order)
-            List<RowData> emitted = extractRecords(harness);
-            assertThat(emitted).hasSize(2);
-            // Key 1 should have final value "k1v2", Key 2 should have "k2v1", 
Key 3 cancelled out
-            assertThat(emitted)
-                    .anySatisfy(
-                            row -> {
-                                assertThat(row.getInt(1)).isEqualTo(1);
-                                
assertThat(row.getString(2).toString()).isEqualTo("k1v2");
-                            })
-                    .anySatisfy(
-                            row -> {
-                                assertThat(row.getInt(1)).isEqualTo(2);
-                                
assertThat(row.getString(2).toString()).isEqualTo("k2v1");
-                            });
-        }
-    }
-
-    // --- Helper Methods ---
-
-    private StreamRecord<RowData> recordWithTimestamp(
-            RowKind kind, long upsertKey, int pk, String value, long 
timestamp) {
-        return new StreamRecord<>(rowOfKind(kind, upsertKey, pk, value), 
timestamp);
-    }
-
-    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
createHarness(
-            ConflictBehavior behavior) throws Exception {
-        RowType keyType = RowType.of(new LogicalType[] 
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
-        WatermarkCompactingSinkMaterializer operator =
-                WatermarkCompactingSinkMaterializer.create(
-                        toStrategy(behavior),
-                        RowType.of(LOGICAL_TYPES),
-                        RECORD_EQUALISER,
-                        UPSERT_KEY_EQUALISER,
-                        new int[] {0}, // upsert key is first column
-                        keyType,
-                        new String[] {PRIMARY_KEY_NAME});
-
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                operator,
-                HandwrittenSelectorUtil.getRowDataSelector(
-                        new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
-                HandwrittenSelectorUtil.getRowDataSelector(
-                                new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
-                        .getProducedType());
-    }
-
-    private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
-            createHarnessWithoutUpsertKey(ConflictBehavior behavior) throws 
Exception {
-        RowType keyType = RowType.of(new LogicalType[] 
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
-        WatermarkCompactingSinkMaterializer operator =
-                WatermarkCompactingSinkMaterializer.create(
-                        toStrategy(behavior),
-                        RowType.of(LOGICAL_TYPES),
-                        RECORD_EQUALISER,
-                        null, // no upsert key equaliser
-                        null, // no upsert key
-                        keyType,
-                        new String[] {PRIMARY_KEY_NAME});
-
-        return new KeyedOneInputStreamOperatorTestHarness<>(
-                operator,
-                HandwrittenSelectorUtil.getRowDataSelector(
-                        new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
-                HandwrittenSelectorUtil.getRowDataSelector(
-                                new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
-                        .getProducedType());
-    }
-
-    private static InsertConflictStrategy toStrategy(ConflictBehavior 
behavior) {
-        switch (behavior) {
-            case ERROR:
-                return InsertConflictStrategy.error();
-            case NOTHING:
-                return InsertConflictStrategy.nothing();
-            case DEDUPLICATE:
-                return InsertConflictStrategy.deduplicate();
-            default:
-                throw new IllegalArgumentException("Unknown behavior: " + 
behavior);
-        }
-    }
-
-    private void assertEmitsNothing(
-            KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness) {
-        assertThat(extractRecords(harness)).isEmpty();
-    }
-
-    private void assertEmits(
-            KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness,
-            RowData... expected) {
-        List<RowData> emitted = extractRecords(harness);
-        assertThat(emitted).containsExactly(expected);
-    }
-
-    private List<RowData> extractRecords(
-            KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness) {
-        final RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[LOGICAL_TYPES.length];
-        for (int i = 0; i < LOGICAL_TYPES.length; i++) {
-            fieldGetters[i] = RowData.createFieldGetter(LOGICAL_TYPES[i], i);
-        }
-
-        final List<RowData> rows = new ArrayList<>();
-        Object o;
-        while ((o = harness.getOutput().poll()) != null) {
-            // Skip watermarks, only process StreamRecords
-            if (o instanceof StreamRecord) {
-                RowData value = (RowData) ((StreamRecord<?>) o).getValue();
-                Object[] row = new Object[LOGICAL_TYPES.length];
-                for (int i = 0; i < LOGICAL_TYPES.length; i++) {
-                    row[i] = fieldGetters[i].getFieldOrNull(value);
-                }
-                GenericRowData newRow = GenericRowData.of(row);
-                newRow.setRowKind(value.getRowKind());
-                rows.add(newRow);
-            }
-        }
-        return rows;
-    }
-
-    /** Test equaliser that compares all fields. */
-    private static class TestRecordEqualiser implements RecordEqualiser {
-        @Override
-        public boolean equals(RowData row1, RowData row2) {
-            return row1.getRowKind() == row2.getRowKind()
-                    && row1.getLong(0) == row2.getLong(0)
-                    && row1.getInt(1) == row2.getInt(1)
-                    && Objects.equals(row1.getString(2), row2.getString(2));
-        }
-    }
-
-    /** Test equaliser that only compares the upsert key (first column). */
-    private static class TestUpsertKeyEqualiser implements RecordEqualiser {
-        @Override
-        public boolean equals(RowData row1, RowData row2) {
-            return row1.getLong(0) == row2.getLong(0);
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
index 99a5deb41e9..40929ed6ac6 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
@@ -104,26 +104,6 @@ public class StreamRecordUtils {
         return new StreamRecord<>(row);
     }
 
-    /** Creates a new {@link RowData} with INSERT kind. Alias for {@link 
#row(Object...)}. */
-    public static RowData insert(Object... fields) {
-        return rowOfKind(RowKind.INSERT, fields);
-    }
-
-    /** Creates a new {@link RowData} with UPDATE_BEFORE kind. */
-    public static RowData updateBefore(Object... fields) {
-        return rowOfKind(RowKind.UPDATE_BEFORE, fields);
-    }
-
-    /** Creates a new {@link RowData} with UPDATE_AFTER kind. */
-    public static RowData updateAfter(Object... fields) {
-        return rowOfKind(RowKind.UPDATE_AFTER, fields);
-    }
-
-    /** Creates a new {@link RowData} with DELETE kind. */
-    public static RowData delete(Object... fields) {
-        return rowOfKind(RowKind.DELETE, fields);
-    }
-
     /** Receives a object array, generates a RowData based on the array. */
     public static RowData rowOfKind(RowKind rowKind, Object... fields) {
         Object[] objects = new Object[fields.length];
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index 9122d74cb79..976cef6349b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -73,7 +73,6 @@ import org.apache.flink.table.dataview.ListViewSerializer;
 import org.apache.flink.table.dataview.MapViewSerializer;
 import org.apache.flink.table.dataview.NullAwareMapSerializer;
 import org.apache.flink.table.dataview.NullSerializer;
-import org.apache.flink.table.runtime.operators.sink.SortedLongSerializer;
 import org.apache.flink.table.runtime.operators.window.CountWindow;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.MetaSqnInfoSerializer;
 import 
org.apache.flink.table.runtime.sequencedmultisetstate.linked.NodeSerializer;
@@ -264,8 +263,7 @@ public class TypeSerializerTestCoverageTest extends 
TestLogger {
                         NodeSerializer.class.getName(),
                         RowSqnInfoSerializer.class.getName(),
                         MetaSqnInfoSerializer.class.getName(),
-                        SetSerializer.class.getName(),
-                        SortedLongSerializer.class.getName());
+                        SetSerializer.class.getName());
 
         // check if a test exists for each type serializer
         for (Class<? extends TypeSerializer> typeSerializer : typeSerializers) 
{


Reply via email to