twalthr commented on code in PR #27502:
URL: https://github.com/apache/flink/pull/27502#discussion_r2788511874


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will result in 
incorrect result. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor;
+    private transient MapState<Long, List<RowData>> buffer;

Review Comment:
   please document what is in this buffer



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java:
##########
@@ -390,6 +406,36 @@ private OneInputStreamOperator<RowData, RowData> 
createSumOperator(
             GeneratedRecordEqualiser rowEqualiser,
             GeneratedHashFunction rowHashFunction) {
 
+        // Check if we should use the watermark-compacting materializer for 
ERROR/NOTHING strategies
+        if (isErrorOrNothingConflictStrategy()) {
+            // Extract primary key column names and key type
+            int[] primaryKeys =

Review Comment:
   nit: we have primaryKeys and pkFieldNames available on the caller side, we 
could pass them instead of recompute



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will result in 
incorrect result. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor;
+    private transient MapState<Long, List<RowData>> buffer;
+    private transient ValueState<RowData> currentValue;

Review Comment:
   please document what is in this  currentValue



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java:
##########
@@ -0,0 +1,743 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBefore;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {
+
+    private static final int PRIMARY_KEY_INDEX = 1;
+    private static final String PRIMARY_KEY_NAME = "pk";
+
+    private static final LogicalType[] LOGICAL_TYPES =
+            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
+
+    private static final GeneratedRecordEqualiser RECORD_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert first record (watermark is MIN_VALUE)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness); // Buffered, waiting for watermark
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // Update with same upsert key (this is the expected pattern for 
single-source updates)
+            harness.processElement(updateAfterRecord(1L, 1, "a2"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark again
+            harness.processWatermark(200L);
+            assertEmits(harness, updateAfter(1L, 1, "a2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "a1"));
+
+            // Delete and compact
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+            harness.processWatermark(200L);
+            assertEmits(harness, delete(1L, 1, "a1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and delete before watermark advances
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Compact - should emit nothing since insert and delete cancel out
+            harness.processWatermark(100L);
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testDoNothingKeepsFirstRecord() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should keep the first record
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "first"));
+        }
+    }
+
+    @Test
+    void testDoErrorThrowsOnConflict() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should throw exception with key info
+            assertThatThrownBy(() -> harness.processWatermark(100L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation")
+                    .hasMessageContaining("[pk=1]");
+        }
+    }
+
+    @Test
+    void testDoErrorAllowsSameUpsertKey() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with same upsert key (updates to same source)
+            harness.processElement(insertRecord(1L, 1, "v1"));
+            harness.processElement(updateAfterRecord(1L, 1, "v2"));
+
+            // Compact - should not throw, just keep the latest
+            harness.processWatermark(100L);
+            assertEmits(harness, insert(1L, 1, "v2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testChangelogDisorderHandling(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Simulate changelog disorder from FLIP-558 example:
+            // Records from different sources (different upsert keys: 1L and 
2L) map to same PK (1)
+            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
+            // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)

Review Comment:
   do we want to test the second disorder case as well? or is this already 
tested somewhere?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will result in 
incorrect result. "

Review Comment:
   ```suggestion
               "The state is cleared because of state TTL. This will lead to 
incorrect results. "
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to