rkhachatryan commented on code in PR #27602:
URL: https://github.com/apache/flink/pull/27602#discussion_r2847916535


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java:
##########
@@ -0,0 +1,960 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {

Review Comment:
   It would be great to parameterize this test with type of backend (heap / 
rocksdb).
   Or at least make it use rocksdb by default as the most widely used.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -341,26 +337,47 @@ protected Transformation<RowData> applyUpsertMaterialize(
                 StateConfigUtil.createTtlConfig(
                         StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList));
 
+        final String[] pkFieldNames =
+                Arrays.stream(primaryKeys)
+                        .mapToObj(idx -> 
physicalRowType.getFieldNames().get(idx))
+                        .toArray(String[]::new);
+
         final OneInputStreamOperator<RowData, RowData> operator =
                 createSumOperator(
                         config,
                         physicalRowType,
+                        primaryKeys,
+                        pkFieldNames,
                         inputUpsertKey,
                         upsertKeyEqualiser,
                         upsertKeyHashFunction,
                         ttlConfig,
                         rowEqualiser,
                         rowHashFunction);
 
-        final String[] fieldNames = 
physicalRowType.getFieldNames().toArray(new String[0]);
-        final List<String> pkFieldNames =
-                Arrays.stream(primaryKeys)
-                        .mapToObj(idx -> fieldNames[idx])
-                        .collect(Collectors.toList());
+        // For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first
+        // This assigns the current watermark as the timestamp to each record,
+        // which is required for the WatermarkCompactingSinkMaterializer to 
work correctly
+        Transformation<RowData> transformForMaterializer = inputTransform;
+        if (isErrorOrNothingConflictStrategy()) {
+            // Use input parallelism to preserve watermark semantics
+            transformForMaterializer =
+                    ExecNodeUtil.createOneInputTransformation(
+                            inputTransform,
+                            createTransformationMeta(
+                                    
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
+                                    "WatermarkTimestampAssigner",
+                                    "WatermarkTimestampAssigner",
+                                    config),
+                            new WatermarkTimestampAssigner(),
+                            inputTransform.getOutputType(),
+                            inputTransform.getParallelism(),
+                            false);

Review Comment:
   Is adding `WatermarkTimestampAssigner` covered by any tests?
   
   I think it would be also good to have an ITCase with more than one input 
channels.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final StateTtlConfig ttlConfig;
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;

Review Comment:
   I think these two fields (`keyType` and `primaryKeyNames`) should be in sync.
   At least because of thise code:
   ```
       private void initializeKeyFieldGetters() {
           this.keyFieldGetters = new 
RowData.FieldGetter[primaryKeyNames.length];
           for (int i = 0; i < primaryKeyNames.length; i++) {
               LogicalType fieldType = keyType.getTypeAt(i);
               keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
           }
       }
   
   ```
   
   But they seem to be initialized from different types of key: projected and 
un-projected:
   ```
           RowType keyType = RowTypeUtils.projectRowType(physicalRowType, 
primaryKeys);
   ```
   and
   ```
           final String[] pkFieldNames =
                   Arrays.stream(primaryKeys)
                           .mapToObj(idx -> 
physicalRowType.getFieldNames().get(idx))
                           .toArray(String[]::new);
   ```
   Am I missing something?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final StateTtlConfig ttlConfig;
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    @Nullable private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            StateTtlConfig ttlConfig,
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.ttlConfig = ttlConfig;
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        MapStateDescriptor<Long, List<RowData>> bufferDescriptor =
+                new MapStateDescriptor<>(
+                        "watermark-buffer",
+                        SortedLongSerializer.INSTANCE,
+                        new ListSerializer<>(serializer));
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+
+        if (ttlConfig.isEnabled()) {
+            bufferDescriptor.enableTimeToLive(
+                    StateTtlConfig.newBuilder(ttlConfig)
+                            
.setTimeToLive(ttlConfig.getTimeToLive().plus(Duration.ofSeconds(1)))
+                            .build());
+            currentValueDescriptor.enableTimeToLive(ttlConfig);
+        }

Review Comment:
   Also would be good have test coverage for TTL; but I think that's NIT



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java:
##########
@@ -0,0 +1,960 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {
+
+    private static final int PRIMARY_KEY_INDEX = 1;
+    private static final String PRIMARY_KEY_NAME = "pk";
+
+    private static final LogicalType[] LOGICAL_TYPES =
+            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
+
+    private static final GeneratedRecordEqualiser RECORD_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert first record (watermark is MIN_VALUE)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness); // Buffered, waiting for watermark
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // Update with same upsert key (this is the expected pattern for 
single-source updates)
+            harness.processElement(updateAfterRecord(1L, 1, "a2"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark again
+            harness.processWatermark(200L);
+            assertEmits(harness, updateAfter(1L, 1, "a2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));

Review Comment:
   Many tests in this class cover the case before the 1st watermark.
   For example, here both `currentWatermark` is null and record timestamp is 
null.
   There are tests that cover after-watermark processing as well, but the 
coverage seems to be lower.
   
   Maybe we can parameterize the class with something like `nullable 
initialWatermark`.
   If it's set, then send it the harness before the test; and set on created 
record unless a specific timestamp is provided.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to