This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1779d20a28f Revert "[FLINK-38928] Implement an operator for handling
DO ERROR/NOTHING (#27502)"
1779d20a28f is described below
commit 1779d20a28f5d43410138314fe3292d0e64f3148
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 12 14:04:40 2026 +0100
Revert "[FLINK-38928] Implement an operator for handling DO ERROR/NOTHING
(#27502)"
This reverts commit 743b9bd2b32149559983eb983c9ec0391a02fb9e.
---
.../plan/nodes/exec/common/CommonExecSink.java | 7 +-
.../plan/nodes/exec/stream/StreamExecSink.java | 76 +-
.../nodes/physical/stream/StreamPhysicalSink.scala | 35 -
.../FlinkChangelogModeInferenceProgram.scala | 38 +-
.../plan/nodes/exec/stream/SinkSemanticTests.java | 31 +-
.../plan/nodes/exec/stream/SinkTestPrograms.java | 35 +-
.../operators/sink/SortedLongSerializer.java | 115 ---
.../sink/WatermarkCompactingSinkMaterializer.java | 504 --------------
.../operators/sink/WatermarkTimestampAssigner.java | 49 --
.../operators/sink/SortedLongSerializerTest.java | 51 --
.../WatermarkCompactingSinkMaterializerTest.java | 769 ---------------------
.../table/runtime/util/StreamRecordUtils.java | 20 -
.../TypeSerializerTestCoverageTest.java | 4 +-
13 files changed, 66 insertions(+), 1668 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 31c0bc4074f..0fc273ca684 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -187,6 +187,9 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
Optional<LineageVertex> lineageVertexOpt =
TableLineageUtils.extractLineageDataset(outputObject);
+ // only add materialization if input has change
+ final boolean needMaterialization = !inputInsertOnly &&
upsertMaterialize;
+
Transformation<RowData> sinkTransform =
applyConstraintValidations(inputTransform, config,
persistedRowType);
@@ -199,10 +202,10 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
primaryKeys,
sinkParallelism,
inputParallelism,
- upsertMaterialize);
+ needMaterialization);
}
- if (upsertMaterialize) {
+ if (needMaterialization) {
sinkTransform =
applyUpsertMaterialize(
sinkTransform,
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index ce95c8b65bf..1726105dee8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.InsertConflictStrategy;
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import
org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter;
import
org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
@@ -55,8 +55,6 @@ import
org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2;
-import
org.apache.flink.table.runtime.operators.sink.WatermarkCompactingSinkMaterializer;
-import
org.apache.flink.table.runtime.operators.sink.WatermarkTimestampAssigner;
import
org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
@@ -102,7 +100,6 @@ import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
- StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
@@ -127,7 +124,6 @@ import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
- StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
@@ -137,9 +133,6 @@ import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
public class StreamExecSink extends CommonExecSink implements
StreamExecNode<Object> {
private static final Logger LOG =
LoggerFactory.getLogger(StreamExecSink.class);
- public static final String WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION =
- "watermark-timestamp-assigner";
-
public static final String FIELD_NAME_INPUT_CHANGELOG_MODE =
"inputChangelogMode";
public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE =
"requireUpsertMaterialize";
public static final String FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY =
"upsertMaterializeStrategy";
@@ -244,6 +237,17 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
@Override
protected Transformation<Object> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
+ // TODO: FLINK-38928 Remove this validation once runtime support for
ERROR and NOTHING is
+ // implemented
+ if (InsertConflictStrategy.error().equals(conflictStrategy)
+ || InsertConflictStrategy.nothing().equals(conflictStrategy)) {
+ throw new ValidationException(
+ "ON CONFLICT DO "
+ + conflictStrategy
+ + " is not yet supported. "
+ + "Please use ON CONFLICT DO DEDUPLICATE
instead.");
+ }
+
final ExecEdge inputEdge = getInputEdges().get(0);
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -337,17 +341,10 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
StateConfigUtil.createTtlConfig(
StateMetadata.getStateTtlForOneInputOperator(config,
stateMetadataList));
- final String[] pkFieldNames =
- Arrays.stream(primaryKeys)
- .mapToObj(idx ->
physicalRowType.getFieldNames().get(idx))
- .toArray(String[]::new);
-
final OneInputStreamOperator<RowData, RowData> operator =
createSumOperator(
config,
physicalRowType,
- primaryKeys,
- pkFieldNames,
inputUpsertKey,
upsertKeyEqualiser,
upsertKeyHashFunction,
@@ -355,29 +352,15 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
rowEqualiser,
rowHashFunction);
- // For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first
- // This assigns the current watermark as the timestamp to each record,
- // which is required for the WatermarkCompactingSinkMaterializer to
work correctly
- Transformation<RowData> transformForMaterializer = inputTransform;
- if (isErrorOrNothingConflictStrategy()) {
- // Use input parallelism to preserve watermark semantics
- transformForMaterializer =
- ExecNodeUtil.createOneInputTransformation(
- inputTransform,
- createTransformationMeta(
-
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
- "WatermarkTimestampAssigner",
- "WatermarkTimestampAssigner",
- config),
- new WatermarkTimestampAssigner(),
- inputTransform.getOutputType(),
- inputTransform.getParallelism(),
- false);
- }
+ final String[] fieldNames =
physicalRowType.getFieldNames().toArray(new String[0]);
+ final List<String> pkFieldNames =
+ Arrays.stream(primaryKeys)
+ .mapToObj(idx -> fieldNames[idx])
+ .collect(Collectors.toList());
OneInputTransformation<RowData, RowData> materializeTransform =
ExecNodeUtil.createOneInputTransformation(
- transformForMaterializer,
+ inputTransform,
createTransformationMeta(
UPSERT_MATERIALIZE_TRANSFORMATION,
String.format(
@@ -400,8 +383,6 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
private OneInputStreamOperator<RowData, RowData> createSumOperator(
ExecNodeConfig config,
RowType physicalRowType,
- int[] primaryKeys,
- String[] pkFieldNames,
int[] inputUpsertKey,
GeneratedRecordEqualiser upsertKeyEqualiser,
GeneratedHashFunction upsertKeyHashFunction,
@@ -409,21 +390,6 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
GeneratedRecordEqualiser rowEqualiser,
GeneratedHashFunction rowHashFunction) {
- // Check if we should use the watermark-compacting materializer for
ERROR/NOTHING strategies
- if (isErrorOrNothingConflictStrategy()) {
- RowType keyType = RowTypeUtils.projectRowType(physicalRowType,
primaryKeys);
-
- return WatermarkCompactingSinkMaterializer.create(
- conflictStrategy,
- physicalRowType,
- rowEqualiser,
- upsertKeyEqualiser,
- inputUpsertKey,
- keyType,
- pkFieldNames);
- }
-
- // Use existing logic for DEDUPLICATE (legacy behavior)
SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
Optional.ofNullable(upsertMaterializeStrategy)
.orElse(SinkUpsertMaterializeStrategy.LEGACY);
@@ -449,12 +415,6 @@ public class StreamExecSink extends CommonExecSink
implements StreamExecNode<Obj
config));
}
- private boolean isErrorOrNothingConflictStrategy() {
- return conflictStrategy != null
- && (conflictStrategy.getBehavior() == ConflictBehavior.ERROR
- || conflictStrategy.getBehavior() ==
ConflictBehavior.NOTHING);
- }
-
private static SequencedMultiSetStateConfig createStateConfig(
SinkUpsertMaterializeStrategy strategy,
TimeDomain ttlTimeDomain,
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index cf2a7d53c9e..c77d8312d52 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
import org.apache.flink.table.api.InsertConflictStrategy
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
import
org.apache.flink.table.api.config.ExecutionConfigOptions.{SinkUpsertMaterializeStrategy,
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY}
import org.apache.flink.table.catalog.ContextResolvedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
@@ -35,12 +34,9 @@ import
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.util.ImmutableBitSet
import java.util
-import scala.collection.JavaConversions._
-
/**
* Stream physical RelNode to write data into an external sink defined by a
[[DynamicTableSink]].
*/
@@ -138,35 +134,4 @@ class StreamPhysicalSink(
.itemIf("upsertMaterialize", "true", upsertMaterialize)
.itemIf("conflictStrategy", conflictStrategy, conflictStrategy != null)
}
-
- def isDeduplicateConflictStrategy: Boolean = {
- conflictStrategy != null && conflictStrategy.getBehavior ==
ConflictBehavior.DEDUPLICATE
- }
-
- def primaryKeysContainsUpsertKey: Boolean = {
- val primaryKeys =
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
- val pks = ImmutableBitSet.of(primaryKeys: _*)
- val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
- val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
- changeLogUpsertKeys != null && changeLogUpsertKeys.exists(pks.contains)
- }
-
- def getUpsertKeyNames: String = {
- val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
- val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
- if (changeLogUpsertKeys == null) {
- "none"
- } else {
- val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
- changeLogUpsertKeys
- .map(uk => uk.toArray.map(fieldNames.get).mkString("[", ", ", "]"))
- .mkString(", ")
- }
- }
-
- def getPrimaryKeyNames: String = {
- val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
- val primaryKeys =
contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
- primaryKeys.map(fieldNames.get).mkString("[", ", ", "]")
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 9cc84db2b7b..baa9b25db02 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -1060,30 +1060,38 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
case UpsertMaterialize.NONE => false
case UpsertMaterialize.AUTO =>
- // if the sink is not an UPSERT sink (has no PK, or is an APPEND or
RETRACT sink)
- // we don't need to materialize results
- if (primaryKeys.isEmpty || sinkIsAppend || sinkIsRetract) {
+ if (
+ (inputIsAppend && InsertConflictStrategy
+ .deduplicate()
+ .equals(sink.conflictStrategy)) || sinkIsAppend || sinkIsRetract
+ ) {
return false
}
-
- // For a DEDUPLICATE strategy and INSERT only input, we simply let
the inserts be handled
- // as UPSERT_AFTER and overwrite previous value
- if (inputIsAppend && sink.isDeduplicateConflictStrategy) {
+ if (primaryKeys.isEmpty) {
return false
}
-
- // if input has updates and primary key != upsert key we should
enable upsertMaterialize.
- //
- // An optimize is: do not enable upsertMaterialize when sink pk(s)
contains input
- // changeLogUpsertKeys
- val upsertKeyDiffersFromPk = !sink.primaryKeysContainsUpsertKey
+ val pks = ImmutableBitSet.of(primaryKeys: _*)
+ val fmq =
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+ // if input has updates and primary key != upsert key (upsert key
can be null) we should
+ // enable upsertMaterialize. An optimize is: do not enable
upsertMaterialize when sink
+ // pk(s) contains input changeLogUpsertKeys
+ val upsertKeyDiffersFromPk =
+ changeLogUpsertKeys == null ||
!changeLogUpsertKeys.exists(pks.contains)
// Validate that ON CONFLICT is specified when upsert key differs
from primary key
val requireOnConflict =
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT)
if (requireOnConflict && upsertKeyDiffersFromPk &&
sink.conflictStrategy == null) {
- val pkNames = sink.getPrimaryKeyNames
- val upsertKeyNames = sink.getUpsertKeyNames
+ val fieldNames =
sink.contextResolvedTable.getResolvedSchema.getColumnNames
+ val pkNames = primaryKeys.map(fieldNames.get(_)).mkString("[", ",
", "]")
+ val upsertKeyNames = if (changeLogUpsertKeys == null) {
+ "none"
+ } else {
+ changeLogUpsertKeys
+ .map(uk => uk.toArray.map(fieldNames.get(_)).mkString("[", ",
", "]"))
+ .mkString(", ")
+ }
throw new ValidationException(
"The query has an upsert key that differs from the primary key
of the sink table " +
s"'${sink.contextResolvedTable.getIdentifier.asSummaryString}'. " +
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
index 6dba8353f02..7eaa6c06c7f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
@@ -18,48 +18,21 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
-import org.apache.flink.table.api.TableEnvironment;
import
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
-import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
-import org.apache.flink.table.test.program.TestStep;
-import org.apache.flink.table.test.program.TestStep.TestKind;
-import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
/** Semantic tests for {@link StreamExecSink}. */
public class SinkSemanticTests extends SemanticTestBase {
- @Override
- public EnumSet<TestKind> supportedSetupSteps() {
- EnumSet<TestKind> steps = super.supportedSetupSteps();
- steps.add(TestKind.SINK_WITHOUT_DATA);
- return steps;
- }
-
- @Override
- protected void runStep(TestStep testStep, TableEnvironment env) throws
Exception {
- if (testStep.getKind() == TestKind.SINK_WITHOUT_DATA) {
- final SinkTestStep sinkTestStep = (SinkTestStep) testStep;
- sinkTestStep.apply(
- env,
- Map.ofEntries(
- Map.entry("connector", "values"),
- Map.entry("sink-insert-only", "false")));
- } else {
- super.runStep(testStep, env);
- }
- }
-
@Override
public List<TableTestProgram> programs() {
return List.of(
SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK,
SinkTestPrograms.INSERT_RETRACT_WITH_PK,
- SinkTestPrograms.ON_CONFLICT_DO_NOTHING_KEEPS_FIRST,
- SinkTestPrograms.ON_CONFLICT_DO_ERROR_NO_CONFLICT,
+ SinkTestPrograms.ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED,
+ SinkTestPrograms.ON_CONFLICT_DO_ERROR_NOT_SUPPORTED,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITH_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_MATCHES_PK_WITHOUT_ON_CONFLICT,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
index 6ddc55cc18b..c59fb88521d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
@@ -89,47 +89,46 @@ public class SinkTestPrograms {
"INSERT INTO sink_t SELECT UPPER(name), SUM(score)
FROM source_t GROUP BY name")
.build();
- // --- ON CONFLICT tests ---
+ // --- ON CONFLICT validation tests ---
- public static final TableTestProgram ON_CONFLICT_DO_NOTHING_KEEPS_FIRST =
+ public static final TableTestProgram ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED =
TableTestProgram.of(
- "sink-on-conflict-do-nothing-keeps-first",
- "ON CONFLICT DO NOTHING keeps the first record
when multiple records have the same PK.")
+ "sink-on-conflict-do-nothing-not-supported",
+ "ON CONFLICT DO NOTHING is not yet supported and
should throw ValidationException.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
- .producedValues(
- Row.ofKind(RowKind.INSERT, 1, 10L),
- Row.ofKind(RowKind.INSERT, 1, 20L),
- Row.ofKind(RowKind.INSERT, 2, 30L))
+ .producedValues(Row.ofKind(RowKind.INSERT,
1, 1L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT PRIMARY KEY NOT
ENFORCED", "b BIGINT")
- .consumedValues("+I[1, 10]", "+I[2, 30]")
.build())
- .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO NOTHING")
+ .runFailingSql(
+ "INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO NOTHING",
+ ValidationException.class,
+ "ON CONFLICT DO NOTHING is not yet supported")
.build();
- public static final TableTestProgram ON_CONFLICT_DO_ERROR_NO_CONFLICT =
+ public static final TableTestProgram ON_CONFLICT_DO_ERROR_NOT_SUPPORTED =
TableTestProgram.of(
- "sink-on-conflict-do-error-no-conflict",
- "ON CONFLICT DO ERROR with no conflicts passes
through all records.")
+ "sink-on-conflict-do-error-not-supported",
+ "ON CONFLICT DO ERROR is not yet supported and
should throw ValidationException.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
- .producedValues(
- Row.ofKind(RowKind.INSERT, 1, 10L),
- Row.ofKind(RowKind.INSERT, 2, 20L))
+ .producedValues(Row.ofKind(RowKind.INSERT,
1, 1L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT PRIMARY KEY NOT
ENFORCED", "b BIGINT")
- .consumedValues("+I[1, 10]", "+I[2, 20]")
.build())
- .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO ERROR")
+ .runFailingSql(
+ "INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO ERROR",
+ ValidationException.class,
+ "ON CONFLICT DO ERROR is not yet supported")
.build();
public static final TableTestProgram
UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT =
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
deleted file mode 100644
index 4a31a1d2da1..00000000000
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.MathUtils;
-
-import java.io.IOException;
-
-/**
- * A serializer for {@code Long} that produces a lexicographically sortable
byte representation.
- *
- * <p>Standard big-endian long serialization does not maintain numeric
ordering for negative values
- * in lexicographic byte comparison because negative numbers have their sign
bit set (1), which
- * makes them appear greater than positive numbers (sign bit 0) in unsigned
byte comparison.
- *
- * <p>This serializer flips the sign bit during serialization, converting from
signed to unsigned
- * ordering. This ensures that when iterating over keys in a sorted state
backend (like RocksDB),
- * the entries are returned in numeric order.
- *
- * @see org.apache.flink.streaming.api.operators.TimerSerializer
- */
-@Internal
-public final class SortedLongSerializer extends TypeSerializerSingleton<Long> {
-
- private static final long serialVersionUID = 1L;
-
- /** Sharable instance of the SortedLongSerializer. */
- public static final SortedLongSerializer INSTANCE = new
SortedLongSerializer();
-
- private static final Long ZERO = 0L;
-
- @Override
- public boolean isImmutableType() {
- return true;
- }
-
- @Override
- public Long createInstance() {
- return ZERO;
- }
-
- @Override
- public Long copy(Long from) {
- return from;
- }
-
- @Override
- public Long copy(Long from, Long reuse) {
- return from;
- }
-
- @Override
- public int getLength() {
- return Long.BYTES;
- }
-
- @Override
- public void serialize(Long record, DataOutputView target) throws
IOException {
- target.writeLong(MathUtils.flipSignBit(record));
- }
-
- @Override
- public Long deserialize(DataInputView source) throws IOException {
- return MathUtils.flipSignBit(source.readLong());
- }
-
- @Override
- public Long deserialize(Long reuse, DataInputView source) throws
IOException {
- return deserialize(source);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws
IOException {
- target.writeLong(source.readLong());
- }
-
- @Override
- public TypeSerializerSnapshot<Long> snapshotConfiguration() {
- return new SortedLongSerializerSnapshot();
- }
-
- // ------------------------------------------------------------------------
-
- /** Serializer configuration snapshot for compatibility and format
evolution. */
- @SuppressWarnings("WeakerAccess")
- public static final class SortedLongSerializerSnapshot
- extends SimpleTypeSerializerSnapshot<Long> {
-
- public SortedLongSerializerSnapshot() {
- super(() -> INSTANCE);
- }
- }
-}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
deleted file mode 100644
index fc975bda184..00000000000
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.runtime.state.KeyedStateBackend;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.api.InsertConflictStrategy;
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
-import org.apache.flink.table.api.TableRuntimeException;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.utils.ProjectedRowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
-import org.apache.flink.table.runtime.generated.RecordEqualiser;
-import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.types.RowKind.DELETE;
-import static org.apache.flink.types.RowKind.INSERT;
-import static org.apache.flink.types.RowKind.UPDATE_AFTER;
-
-/**
- * A sink materializer that buffers records and compacts them on watermark
progression.
- *
- * <p>This operator implements the watermark-based compaction algorithm from
FLIP-558 for handling
- * changelog disorder when the upsert key differs from the sink's primary key.
- */
-public class WatermarkCompactingSinkMaterializer extends
TableStreamOperator<RowData>
- implements OneInputStreamOperator<RowData, RowData> {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOG =
- LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
-
- private static final String STATE_CLEARED_WARN_MSG =
- "The state is cleared because of state TTL. This will lead to
incorrect results. "
- + "You can increase the state TTL to avoid this.";
- private static final Set<String> ORDERED_STATE_BACKENDS =
Set.of("rocksdb", "forst");
-
- private final InsertConflictStrategy conflictStrategy;
- private final TypeSerializer<RowData> serializer;
- private final GeneratedRecordEqualiser generatedRecordEqualiser;
- private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
- private final int[] inputUpsertKey;
- private final boolean hasUpsertKey;
- private final RowType keyType;
- private final String[] primaryKeyNames;
-
- private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor;
- // Buffers incoming changelog records (INSERT, UPDATE_BEFORE,
UPDATE_AFTER, DELETE) keyed by
- // their timestamp. Watermarks act as compaction barriers: when a
watermark arrives, we know
- // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been
received and can be
- // compacted together. This solves the out-of-order problem where a later
UPDATE_AFTER may
- // arrive before the UPDATE_BEFORE of a previous change.
- private transient MapState<Long, List<RowData>> buffer;
-
- // Stores the last emitted value for the current primary key. Used to
detect duplicates
- // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on
subsequent compactions.
- private transient ValueState<RowData> currentValue;
- private transient RecordEqualiser equaliser;
- private transient RecordEqualiser upsertKeyEqualiser;
- private transient TimestampedCollector<RowData> collector;
- private transient boolean isOrderedStateBackend;
-
- // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
- private transient ProjectedRowData upsertKeyProjectedRow1;
- private transient ProjectedRowData upsertKeyProjectedRow2;
-
- // Field getters for formatting the primary key in error messages.
- private transient RowData.FieldGetter[] keyFieldGetters;
-
- // Tracks the current watermark. Used to detect in-flight records after
restore.
- private transient long currentWatermark = Long.MIN_VALUE;
-
- public WatermarkCompactingSinkMaterializer(
- InsertConflictStrategy conflictStrategy,
- TypeSerializer<RowData> serializer,
- GeneratedRecordEqualiser generatedRecordEqualiser,
- @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
- @Nullable int[] inputUpsertKey,
- RowType keyType,
- String[] primaryKeyNames) {
- validateConflictStrategy(conflictStrategy);
- this.conflictStrategy = conflictStrategy;
- this.serializer = serializer;
- this.generatedRecordEqualiser = generatedRecordEqualiser;
- this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
- this.inputUpsertKey = inputUpsertKey;
- this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length >
0;
- this.keyType = keyType;
- this.primaryKeyNames = primaryKeyNames;
- }
-
- private static void validateConflictStrategy(InsertConflictStrategy
strategy) {
- Preconditions.checkArgument(
- strategy.getBehavior() == ConflictBehavior.ERROR
- || strategy.getBehavior() == ConflictBehavior.NOTHING,
- "Only ERROR and NOTHING strategies are supported, got: %s",
- strategy);
- }
-
- @Override
- public void initializeState(StateInitializationContext context) throws
Exception {
- super.initializeState(context);
-
- // Initialize state descriptors and handles
- this.bufferDescriptor =
- new MapStateDescriptor<>(
- "watermark-buffer",
- SortedLongSerializer.INSTANCE,
- new ListSerializer<>(serializer));
- this.buffer =
context.getKeyedStateStore().getMapState(bufferDescriptor);
-
- ValueStateDescriptor<RowData> currentValueDescriptor =
- new ValueStateDescriptor<>("current-value", serializer);
- this.currentValue =
context.getKeyedStateStore().getState(currentValueDescriptor);
-
- if (context.isRestored()) {
- // Detect ordered state backend before consolidation
- detectOrderedStateBackend();
-
- // Consolidate all buffered records to MIN_VALUE for each key.
- // This ensures they are compacted on the first watermark after
restore.
- getKeyedStateBackend()
- .applyToAllKeys(
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- bufferDescriptor,
- (key, state) -> consolidateBufferToMinValue());
- }
- }
-
- private void consolidateBufferToMinValue() throws Exception {
- List<RowData> consolidated = new ArrayList<>();
-
- if (isOrderedStateBackend) {
- // RocksDB/ForSt: entries are already sorted by timestamp
- Iterator<Map.Entry<Long, List<RowData>>> iterator =
buffer.entries().iterator();
- while (iterator.hasNext()) {
- consolidated.addAll(iterator.next().getValue());
- iterator.remove();
- }
- } else {
- // Other backends: collect, sort by timestamp, then consolidate
- List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
- Iterator<Map.Entry<Long, List<RowData>>> iterator =
buffer.entries().iterator();
- while (iterator.hasNext()) {
- entries.add(iterator.next());
- iterator.remove();
- }
-
- entries.sort(Map.Entry.comparingByKey());
-
- for (Map.Entry<Long, List<RowData>> entry : entries) {
- consolidated.addAll(entry.getValue());
- }
- }
-
- if (!consolidated.isEmpty()) {
- buffer.put(Long.MIN_VALUE, consolidated);
- }
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- initializeEqualisers();
- initializeKeyFieldGetters();
- detectOrderedStateBackend();
- this.collector = new TimestampedCollector<>(output);
- }
-
- private void initializeKeyFieldGetters() {
- this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
- for (int i = 0; i < primaryKeyNames.length; i++) {
- LogicalType fieldType = keyType.getTypeAt(i);
- keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
- }
- }
-
- private void initializeEqualisers() {
- if (hasUpsertKey) {
- this.upsertKeyEqualiser =
- generatedUpsertKeyEqualiser.newInstance(
- getRuntimeContext().getUserCodeClassLoader());
- upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
- upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
- }
- this.equaliser =
-
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
- }
-
- private void detectOrderedStateBackend() {
- KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
- String backendType =
- keyedStateBackend != null ?
keyedStateBackend.getBackendTypeIdentifier() : "";
- this.isOrderedStateBackend =
ORDERED_STATE_BACKENDS.contains(backendType);
-
- if (isOrderedStateBackend) {
- LOG.info("Using ordered state backend optimization for {}
backend", backendType);
- }
- }
-
- @Override
- public void processElement(StreamRecord<RowData> element) throws Exception
{
- RowData row = element.getValue();
- long assignedTimestamp = element.getTimestamp();
-
- // If we haven't received any watermark yet (still at MIN_VALUE after
restore)
- // and the timestamp is beyond MIN_VALUE, it's from in-flight data
that was
- // checkpointed before restore. Assign to MIN_VALUE.
- if (currentWatermark == Long.MIN_VALUE && assignedTimestamp >
Long.MIN_VALUE) {
- assignedTimestamp = Long.MIN_VALUE;
- }
-
- bufferRecord(assignedTimestamp, row);
- }
-
- private void bufferRecord(long timestamp, RowData row) throws Exception {
- List<RowData> records = buffer.get(timestamp);
- if (records == null) {
- records = new ArrayList<>();
- }
- switch (row.getRowKind()) {
- case INSERT:
- case UPDATE_AFTER:
- // Try to cancel out a pending retraction; if none, just append
- if (!tryCancelRetraction(records, row)) {
- records.add(row);
- }
- break;
- case UPDATE_BEFORE:
- case DELETE:
- // Try to cancel out an existing addition; if none, keep for
cross-bucket
- if (!tryCancelAddition(records, row)) {
- records.add(row);
- }
- break;
- }
- buffer.put(timestamp, records);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- final long watermarkTimestamp = mark.getTimestamp();
- this.currentWatermark = watermarkTimestamp;
-
- // Iterate over all keys and compact their buffered records
- this.<RowData>getKeyedStateBackend()
- .applyToAllKeys(
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- bufferDescriptor,
- (key, state) -> compactAndEmit(watermarkTimestamp));
-
- super.processWatermark(mark);
- }
-
- private void compactAndEmit(long newWatermark) throws Exception {
- RowData previousValue = currentValue.value();
- List<RowData> pendingRecords = collectPendingRecords(previousValue,
newWatermark);
-
- if (pendingRecords.size() > 1) {
- if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
- RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
- throw new TableRuntimeException(
- "Primary key constraint violation: multiple distinct
records with "
- + "the same primary key detected. Conflicting
key: ["
- + formatKey(key)
- + "]. Use ON CONFLICT DO NOTHING to keep the
first record.");
- } else if (previousValue == null) {
- final RowData newValue = pendingRecords.get(0);
- emit(newValue, INSERT);
- currentValue.update(newValue);
- }
- } else if (pendingRecords.isEmpty()) {
- if (previousValue != null) {
- emit(previousValue, DELETE);
- currentValue.clear();
- }
- } else {
- final RowData newValue = pendingRecords.get(0);
- if (previousValue == null) {
- emit(newValue, INSERT);
- currentValue.update(newValue);
- } else if (!recordEquals(previousValue, newValue)) {
- emit(newValue, UPDATE_AFTER);
- currentValue.update(newValue);
- }
- }
- }
-
- private List<RowData> collectPendingRecords(RowData previousValue, long
newWatermark)
- throws Exception {
- List<RowData> records = new ArrayList<>();
- if (previousValue != null) {
- records.add(previousValue);
- }
- Iterator<Map.Entry<Long, List<RowData>>> iterator =
buffer.entries().iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<Long, List<RowData>> entry = iterator.next();
- if (entry.getKey() <= newWatermark) {
- for (RowData pendingRecord : entry.getValue()) {
- switch (pendingRecord.getRowKind()) {
- case INSERT:
- case UPDATE_AFTER:
- addRow(records, pendingRecord);
- break;
-
- case UPDATE_BEFORE:
- case DELETE:
- retractRow(records, pendingRecord);
- break;
- }
- }
- iterator.remove();
- } else if (isOrderedStateBackend) {
- break;
- }
- }
- return records;
- }
-
- private void addRow(List<RowData> values, RowData add) {
- if (hasUpsertKey) {
- int index = findFirst(values, add);
- if (index == -1) {
- values.add(add);
- } else {
- values.set(index, add);
- }
- } else {
- values.add(add);
- }
- }
-
- private void retractRow(List<RowData> values, RowData retract) {
- final int index = findFirst(values, retract);
- if (index == -1) {
- LOG.info(STATE_CLEARED_WARN_MSG);
- } else {
- // Remove first found row
- values.remove(index);
- }
- }
-
- /**
- * Attempts to cancel out a retraction by finding a matching retractive
record
- * (DELETE/UPDATE_BEFORE) with identical content.
- *
- * @return true if a matching retraction was found and removed, false
otherwise
- */
- private boolean tryCancelRetraction(List<RowData> values, RowData
addition) {
- final Iterator<RowData> iterator = values.iterator();
- while (iterator.hasNext()) {
- RowData candidate = iterator.next();
- RowKind kind = candidate.getRowKind();
- if ((kind == DELETE || kind == RowKind.UPDATE_BEFORE)
- && recordEquals(addition, candidate)) {
- iterator.remove();
- return true;
- }
- }
- return false;
- }
-
- /**
- * Attempts to cancel out an addition by finding a matching additive record
- * (INSERT/UPDATE_AFTER) with identical content.
- *
- * @return true if a matching addition was found and removed, false
otherwise
- */
- private boolean tryCancelAddition(List<RowData> values, RowData
retraction) {
- final Iterator<RowData> iterator = values.iterator();
- while (iterator.hasNext()) {
- RowData candidate = iterator.next();
- RowKind kind = candidate.getRowKind();
- if ((kind == INSERT || kind == UPDATE_AFTER) &&
recordEquals(retraction, candidate)) {
- iterator.remove();
- return true;
- }
- }
- return false;
- }
-
- private int findFirst(List<RowData> values, RowData target) {
- final Iterator<RowData> iterator = values.iterator();
- int i = 0;
- while (iterator.hasNext()) {
- if (equalsIgnoreRowKind(target, iterator.next())) {
- return i;
- }
- i++;
- }
- return -1;
- }
-
- private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
- newRow.setRowKind(oldRow.getRowKind());
- if (hasUpsertKey) {
- return this.upsertKeyEqualiser.equals(
- upsertKeyProjectedRow1.replaceRow(newRow),
- upsertKeyProjectedRow2.replaceRow(oldRow));
- }
- return equaliser.equals(newRow, oldRow);
- }
-
- private void emit(RowData row, RowKind kind) {
- RowKind originalKind = row.getRowKind();
- row.setRowKind(kind);
- collector.collect(row);
- row.setRowKind(originalKind);
- }
-
- private boolean recordEquals(RowData row1, RowData row2) {
- RowKind kind1 = row1.getRowKind();
- RowKind kind2 = row2.getRowKind();
- row1.setRowKind(RowKind.INSERT);
- row2.setRowKind(RowKind.INSERT);
- boolean result = equaliser.equals(row1, row2);
- row1.setRowKind(kind1);
- row2.setRowKind(kind2);
- return result;
- }
-
- private String formatKey(RowData key) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < primaryKeyNames.length; i++) {
- if (i > 0) {
- sb.append(", ");
- }
- sb.append(primaryKeyNames[i]).append("=");
- if (key.isNullAt(i)) {
- sb.append("null");
- } else {
- sb.append(keyFieldGetters[i].getFieldOrNull(key));
- }
- }
- return sb.toString();
- }
-
- /** Factory method to create a new instance. */
- public static WatermarkCompactingSinkMaterializer create(
- InsertConflictStrategy conflictStrategy,
- RowType physicalRowType,
- GeneratedRecordEqualiser rowEqualiser,
- @Nullable GeneratedRecordEqualiser upsertKeyEqualiser,
- @Nullable int[] inputUpsertKey,
- RowType keyType,
- String[] primaryKeyNames) {
- return new WatermarkCompactingSinkMaterializer(
- conflictStrategy,
- InternalSerializers.create(physicalRowType),
- rowEqualiser,
- upsertKeyEqualiser,
- inputUpsertKey,
- keyType,
- primaryKeyNames);
- }
-}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
deleted file mode 100644
index 17598df521d..00000000000
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.operators.TableStreamOperator;
-
-/**
- * Operator that assigns the current watermark as the timestamp to each
incoming StreamRecord.
- *
- * <p>This is used in conjunction with {@link
WatermarkCompactingSinkMaterializer} which buffers
- * records by their timestamp. Without meaningful timestamps, all records
would be buffered under
- * the same key, breaking the watermark-based compaction logic.
- *
- * <p>If the current watermark is {@code Long.MIN_VALUE} (the initial state
before any watermark
- * arrives), records will be assigned that value and will be compacted when
the first watermark
- * arrives.
- */
-@Internal
-public class WatermarkTimestampAssigner extends TableStreamOperator<RowData>
- implements OneInputStreamOperator<RowData, RowData> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void processElement(StreamRecord<RowData> element) throws Exception
{
- element.setTimestamp(currentWatermark);
- output.collect(element);
- }
-}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
deleted file mode 100644
index d331eb18c94..00000000000
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.Random;
-
-/** A test for the {@link SortedLongSerializer}. */
-class SortedLongSerializerTest extends SerializerTestBase<Long> {
-
- @Override
- protected TypeSerializer<Long> createSerializer() {
- return SortedLongSerializer.INSTANCE;
- }
-
- @Override
- protected int getLength() {
- return 8;
- }
-
- @Override
- protected Class<Long> getTypeClass() {
- return Long.class;
- }
-
- @Override
- protected Long[] getTestData() {
- Random rnd = new Random(874597969123412341L);
- long rndLong = rnd.nextLong();
-
- return new Long[] {0L, 1L, -1L, Long.MAX_VALUE, Long.MIN_VALUE,
rndLong, -rndLong};
- }
-}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
deleted file mode 100644
index adb701a58e9..00000000000
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java
+++ /dev/null
@@ -1,769 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.sink;
-
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.table.api.InsertConflictStrategy;
-import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
-import org.apache.flink.table.api.TableRuntimeException;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
-import org.apache.flink.table.runtime.generated.RecordEqualiser;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.utils.HandwrittenSelectorUtil;
-import org.apache.flink.types.RowKind;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete;
-import static
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
-import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert;
-import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
-import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
-import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter;
-import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
-import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
-class WatermarkCompactingSinkMaterializerTest {
-
- private static final int PRIMARY_KEY_INDEX = 1;
- private static final String PRIMARY_KEY_NAME = "pk";
-
- private static final LogicalType[] LOGICAL_TYPES =
- new LogicalType[] {new BigIntType(), new IntType(), new
VarCharType()};
-
- private static final GeneratedRecordEqualiser RECORD_EQUALISER =
- new GeneratedRecordEqualiser("", "", new Object[0]) {
- @Override
- public RecordEqualiser newInstance(ClassLoader classLoader) {
- return new TestRecordEqualiser();
- }
- };
-
- private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
- new GeneratedRecordEqualiser("", "", new Object[0]) {
- @Override
- public RecordEqualiser newInstance(ClassLoader classLoader) {
- return new TestUpsertKeyEqualiser();
- }
- };
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior)
throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
-
- // Insert first record (watermark is MIN_VALUE)
- harness.processElement(insertRecord(1L, 1, "a1"));
- assertEmitsNothing(harness); // Buffered, waiting for watermark
-
- // Advance watermark to trigger compaction
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "a1"));
-
- // Update with same upsert key (this is the expected pattern for
single-source updates)
- harness.processElement(updateAfterRecord(1L, 1, "a2"));
- assertEmitsNothing(harness);
-
- // Advance watermark again
- harness.processWatermark(200L);
- assertEmits(harness, updateAfter(1L, 1, "a2"));
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
-
- // Insert and compact
- harness.processElement(insertRecord(1L, 1, "a1"));
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "a1"));
-
- // Delete and compact
- harness.processElement(deleteRecord(1L, 1, "a1"));
- harness.processWatermark(200L);
- assertEmits(harness, delete(1L, 1, "a1"));
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws
Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
-
- // Insert and delete before watermark advances
- harness.processElement(insertRecord(1L, 1, "a1"));
- harness.processElement(deleteRecord(1L, 1, "a1"));
-
- // Compact - should emit nothing since insert and delete cancel out
- harness.processWatermark(100L);
- assertEmitsNothing(harness);
- }
- }
-
- @Test
- void testDoNothingKeepsFirstRecord() throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(ConflictBehavior.NOTHING)) {
- harness.open();
-
- // Insert two records with different upsert keys but same primary
key
- harness.processElement(insertRecord(1L, 1, "first"));
- harness.processElement(insertRecord(2L, 1, "second"));
-
- // Compact - should keep the first record
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "first"));
- }
- }
-
- @Test
- void testDoErrorThrowsOnConflict() throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(ConflictBehavior.ERROR)) {
- harness.open();
-
- // Insert two records with different upsert keys but same primary
key
- harness.processElement(insertRecord(1L, 1, "first"));
- harness.processElement(insertRecord(2L, 1, "second"));
-
- // Compact - should throw exception with key info
- assertThatThrownBy(() -> harness.processWatermark(100L))
- .isInstanceOf(TableRuntimeException.class)
- .hasMessageContaining("Primary key constraint violation")
- .hasMessageContaining("[pk=1]");
- }
- }
-
- @Test
- void testDoErrorAllowsSameUpsertKey() throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(ConflictBehavior.ERROR)) {
- harness.open();
-
- // Insert two records with same upsert key (updates to same source)
- harness.processElement(insertRecord(1L, 1, "v1"));
- harness.processElement(updateAfterRecord(1L, 1, "v2"));
-
- // Compact - should not throw, just keep the latest
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "v2"));
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testChangelogDisorderHandling(ConflictBehavior behavior) throws
Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
-
- // Simulate changelog disorder from FLIP-558 example:
- // Records from different sources (different upsert keys: 1L and
2L) map to same PK (1)
- // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
- // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)
-
- // The +U from source 2 arrives first (upsert key = 2L)
- harness.processElement(updateAfterRecord(2L, 1, "b1"));
- // Then +I and -U from source 1 arrive (upsert key = 1L)
- harness.processElement(insertRecord(1L, 1, "a1"));
- harness.processElement(updateBeforeRecord(1L, 1, "a1"));
-
- // Net result: only (2L, 1, "b1") remains after cancellation, no
conflict
- harness.processWatermark(100L);
- assertEmits(harness, insert(2L, 1, "b1"));
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testChangelogDisorderUpdateBeforeAfterInsert(ConflictBehavior
behavior) throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
-
- // Simulate changelog disorder where UPDATE_AFTER arrives before
UPDATE_BEFORE:
- // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(1,1,a2)
- // Disordered: +I(1,1,a1), +U(1,1,a2), -U(1,1,a1)
-
- // INSERT the initial value
- harness.processElement(insertRecord(1L, 1, "a1"));
- // UPDATE_AFTER arrives before the UPDATE_BEFORE
- harness.processElement(updateAfterRecord(1L, 1, "a2"));
- // UPDATE_BEFORE (retraction of the original INSERT) arrives last
- harness.processElement(updateBeforeRecord(1L, 1, "a1"));
-
- // Net result: +I and -U cancel out, only +U(1,1,a2) remains
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "a2"));
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws
Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
-
- // Insert and compact
- harness.processElement(insertRecord(1L, 1, "value"));
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "value"));
-
- // Insert same value again (same upsert key)
- harness.processElement(updateAfterRecord(1L, 1, "value"));
- harness.processWatermark(200L);
- // Should not emit since value is the same
- assertEmitsNothing(harness);
- }
- }
-
- /**
- * Tests that record timestamps are handled correctly when multiple inputs
send records that
- * arrive out of timestamp order. This simulates the case where records
from different upstream
- * tasks have different timestamps and arrive interleaved.
- *
- * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All
records have same primary
- * key (1).
- *
- * <p>Sequence:
- *
- * <ol>
- * <li>INSERT(input=1, t=2)
- * <li>watermark=3 -> emits INSERT
- * <li>UPDATE_BEFORE(input=1, t=4)
- * <li>UPDATE_AFTER(input=1, t=6)
- * <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has
smaller timestamp
- * <li>watermark=5 -> compacts t<=5 records
- * <li>UPDATE_BEFORE(input=2, t=6)
- * <li>watermark=10 -> compacts remaining t=6 records
- * </ol>
- */
- @Test
- void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(ConflictBehavior.NOTHING)) {
- harness.open();
-
- // INSERT from input 1 with timestamp 2
- harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"v1", 2L));
- assertEmitsNothing(harness);
-
- // watermark=3: compacts records with t<=3, emits INSERT(t=2)
- harness.processWatermark(3L);
- assertEmits(harness, insert(1L, 1, "v1"));
-
- // UPDATE_BEFORE from input 1 with timestamp 4
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE,
1L, 1, "v1", 4L));
- assertEmitsNothing(harness);
-
- // UPDATE_AFTER from input 1 with timestamp 6
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v4", 6L));
- assertEmitsNothing(harness);
-
- // UPDATE_AFTER from input 2 with timestamp 4
- // This record arrives after the t=6 record but has a smaller
timestamp
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
2L, 1, "v3", 4L));
-
- // watermark=5: compacts records with t<=5
- // Buffered: UPDATE_BEFORE(1L, t=4) and UPDATE_AFTER(2L, t=4)
cancel out for input 1,
- // UPDATE_AFTER(2L, t=4) is emitted
- harness.processWatermark(5L);
- assertEmits(harness, updateAfter(2L, 1, "v3"));
-
- // UPDATE_BEFORE from input 2 with timestamp 6
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE,
2L, 1, "v3", 6L));
- assertEmitsNothing(harness);
-
- // Final watermark to flush all remaining buffered records (t=6)
- // Buffered: UPDATE_AFTER(1L, t=6) and UPDATE_BEFORE(2L, t=6)
- // After compaction: UPDATE_AFTER(1L, "v4") remains as final value
- harness.processWatermark(10L);
- assertEmits(harness, updateAfter(1L, 1, "v4"));
- }
- }
-
- // --- Tests without upsert key ---
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testBasicInsertWithoutUpsertKey(ConflictBehavior behavior) throws
Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(behavior)) {
- harness.open();
-
- // Insert first record
- harness.processElement(insertRecord(1L, 1, "a1"));
- assertEmitsNothing(harness);
-
- // Advance watermark to trigger compaction
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "a1"));
- }
- }
-
- @Test
- void testUpdateWithoutUpsertKeyNothingStrategy() throws Exception {
- // Without upsert key, UPDATE_AFTER on existing value causes conflict
(two rows accumulate)
- // NOTHING strategy keeps the first value
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
- harness.open();
-
- harness.processElement(insertRecord(1L, 1, "a1"));
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "a1"));
-
- // UPDATE_AFTER with different value - creates conflict, NOTHING
keeps first
- harness.processElement(updateAfterRecord(2L, 1, "a2"));
- harness.processWatermark(200L);
- // NOTHING keeps the previous value (1L, 1, "a1"), no emission
since unchanged
- assertEmitsNothing(harness);
- }
- }
-
- @Test
- void testUpdateWithoutUpsertKeyErrorStrategy() throws Exception {
- // Without upsert key, UPDATE_AFTER on existing value causes conflict
(two rows accumulate)
- // ERROR strategy throws
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
- harness.open();
-
- harness.processElement(insertRecord(1L, 1, "a1"));
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "a1"));
-
- // UPDATE_AFTER with different value - creates conflict, ERROR
throws with key info
- harness.processElement(updateAfterRecord(2L, 1, "a2"));
- assertThatThrownBy(() -> harness.processWatermark(200L))
- .isInstanceOf(TableRuntimeException.class)
- .hasMessageContaining("Primary key constraint violation")
- .hasMessageContaining("[pk=1]");
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testDeleteAfterInsertWithoutUpsertKey(ConflictBehavior behavior)
throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(behavior)) {
- harness.open();
-
- // Insert and compact
- harness.processElement(insertRecord(1L, 1, "a1"));
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "a1"));
-
- // Delete with exact same row and compact
- harness.processElement(deleteRecord(1L, 1, "a1"));
- harness.processWatermark(200L);
- assertEmits(harness, delete(1L, 1, "a1"));
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testInsertAndDeleteInSameWindowWithoutUpsertKey(ConflictBehavior
behavior)
- throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(behavior)) {
- harness.open();
-
- // Insert and delete with exact same row before watermark advances
- harness.processElement(insertRecord(1L, 1, "a1"));
- harness.processElement(deleteRecord(1L, 1, "a1"));
-
- // Compact - should emit nothing since insert and delete cancel out
- harness.processWatermark(100L);
- assertEmitsNothing(harness);
- }
- }
-
- @Test
- void testIdenticalInsertsWithoutUpsertKeyNothingKeepsFirst() throws
Exception {
- // Without upsert key, even identical inserts are separate entries
- // NOTHING strategy just keeps the first one
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
- harness.open();
-
- // Insert two identical records (same full row)
- harness.processElement(insertRecord(1L, 1, "same"));
- harness.processElement(insertRecord(1L, 1, "same"));
-
- // Compact - NOTHING keeps first record
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "same"));
- }
- }
-
- @Test
- void testIdenticalInsertsWithoutUpsertKeyErrorThrows() throws Exception {
- // Without upsert key, even identical inserts are separate entries
- // ERROR strategy throws because there are multiple records
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
- harness.open();
-
- // Insert two identical records (same full row)
- harness.processElement(insertRecord(1L, 1, "same"));
- harness.processElement(insertRecord(1L, 1, "same"));
-
- // Compact - ERROR throws because there are multiple pending
records, includes key info
- assertThatThrownBy(() -> harness.processWatermark(100L))
- .isInstanceOf(TableRuntimeException.class)
- .hasMessageContaining("Primary key constraint violation")
- .hasMessageContaining("[pk=1]");
- }
- }
-
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testInsertUpdateDeleteWithoutUpsertKey(ConflictBehavior behavior)
throws Exception {
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarnessWithoutUpsertKey(behavior)) {
- harness.open();
-
- // Insert, then update_before + update_after sequence
- harness.processElement(insertRecord(1L, 1, "v1"));
- harness.processWatermark(100L);
- assertEmits(harness, insert(1L, 1, "v1"));
-
- // Update: retract old value, insert new value
- harness.processElement(updateBeforeRecord(1L, 1, "v1"));
- harness.processElement(updateAfterRecord(2L, 1, "v2"));
- harness.processWatermark(200L);
- assertEmits(harness, updateAfter(2L, 1, "v2"));
-
- // Delete the current value
- harness.processElement(deleteRecord(2L, 1, "v2"));
- harness.processWatermark(300L);
- assertEmits(harness, delete(2L, 1, "v2"));
- }
- }
-
- // --- Restore Tests ---
-
- /**
- * Tests that buffered records at different timestamps before checkpoint
are consolidated to
- * MIN_VALUE on restore and compacted on the first watermark.
- */
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testBufferedRecordsConsolidatedOnRestore(ConflictBehavior behavior)
throws Exception {
- OperatorSubtaskState snapshot;
-
- // First harness: buffer records at different timestamps, then take
snapshot
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
-
- // Buffer records at different timestamps (simulating records
before checkpoint)
- harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"v1", 1000L));
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v2", 2000L));
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v3", 3000L));
-
- // No watermark yet, so nothing emitted
- assertEmitsNothing(harness);
-
- // Take snapshot with buffered records
- snapshot = harness.snapshot(1L, 1L);
- }
-
- // Second harness: restore from snapshot
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.initializeState(snapshot);
- harness.open();
-
- // After restore, watermarks restart from MIN_VALUE.
- // The buffered records should have been consolidated to MIN_VALUE.
- // First watermark (even a small one) should trigger compaction of
all consolidated
- // records.
- harness.processWatermark(100L);
-
- // All records were from same upsert key, so only final value is
emitted
- assertEmits(harness, insert(1L, 1, "v3"));
- }
- }
-
- /**
- * Tests that in-flight records from unaligned checkpoints (records with
timestamps > MIN_VALUE
- * arriving before first watermark after restore) are correctly handled.
- */
- @ParameterizedTest
- @EnumSource(
- value = ConflictBehavior.class,
- names = {"ERROR", "NOTHING"})
- void testInFlightRecordsAfterRestore(ConflictBehavior behavior) throws
Exception {
- OperatorSubtaskState snapshot;
-
- // First harness: take empty snapshot
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.open();
- snapshot = harness.snapshot(1L, 1L);
- }
-
- // Second harness: restore and simulate in-flight records
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(behavior)) {
- harness.initializeState(snapshot);
- harness.open();
-
- // Simulate in-flight records that were checkpointed with their
old timestamps.
- // These arrive after restore but before any watermark is received.
- // They have timestamps > MIN_VALUE from before the checkpoint.
- harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"v1", 5000L));
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "v2", 5100L));
-
- // No watermark yet, nothing emitted
- assertEmitsNothing(harness);
-
- // First watermark after restore. Since currentWatermark was
MIN_VALUE,
- // in-flight records should have been assigned to MIN_VALUE and
will be compacted.
- harness.processWatermark(100L);
-
- // Both records had same upsert key, so only final value is emitted
- assertEmits(harness, insert(1L, 1, "v2"));
- }
- }
-
- /**
- * Tests restore with multiple keys having buffered records at different
timestamps. Verifies
- * that each key's records are correctly consolidated and compacted.
- */
- @Test
- void testRestoreWithMultipleKeys() throws Exception {
- OperatorSubtaskState snapshot;
-
- // First harness: buffer records for multiple keys
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(ConflictBehavior.NOTHING)) {
- harness.open();
-
- // Key 1: multiple updates
- harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1,
"k1v1", 1000L));
- harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER,
1L, 1, "k1v2", 2000L));
-
- // Key 2: single insert
- harness.processElement(recordWithTimestamp(RowKind.INSERT, 2L, 2,
"k2v1", 1500L));
-
- // Key 3: insert then delete (should result in nothing)
- harness.processElement(recordWithTimestamp(RowKind.INSERT, 3L, 3,
"k3v1", 1000L));
- harness.processElement(recordWithTimestamp(RowKind.DELETE, 3L, 3,
"k3v1", 2500L));
-
- snapshot = harness.snapshot(1L, 1L);
- }
-
- // Second harness: restore and verify
- try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness =
- createHarness(ConflictBehavior.NOTHING)) {
- harness.initializeState(snapshot);
- harness.open();
-
- // First watermark compacts all consolidated records
- harness.processWatermark(100L);
-
- // Extract and verify results (order depends on key processing
order)
- List<RowData> emitted = extractRecords(harness);
- assertThat(emitted).hasSize(2);
- // Key 1 should have final value "k1v2", Key 2 should have "k2v1",
Key 3 cancelled out
- assertThat(emitted)
- .anySatisfy(
- row -> {
- assertThat(row.getInt(1)).isEqualTo(1);
-
assertThat(row.getString(2).toString()).isEqualTo("k1v2");
- })
- .anySatisfy(
- row -> {
- assertThat(row.getInt(1)).isEqualTo(2);
-
assertThat(row.getString(2).toString()).isEqualTo("k2v1");
- });
- }
- }
-
- // --- Helper Methods ---
-
- private StreamRecord<RowData> recordWithTimestamp(
- RowKind kind, long upsertKey, int pk, String value, long
timestamp) {
- return new StreamRecord<>(rowOfKind(kind, upsertKey, pk, value),
timestamp);
- }
-
- private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
createHarness(
- ConflictBehavior behavior) throws Exception {
- RowType keyType = RowType.of(new LogicalType[]
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
- WatermarkCompactingSinkMaterializer operator =
- WatermarkCompactingSinkMaterializer.create(
- toStrategy(behavior),
- RowType.of(LOGICAL_TYPES),
- RECORD_EQUALISER,
- UPSERT_KEY_EQUALISER,
- new int[] {0}, // upsert key is first column
- keyType,
- new String[] {PRIMARY_KEY_NAME});
-
- return new KeyedOneInputStreamOperatorTestHarness<>(
- operator,
- HandwrittenSelectorUtil.getRowDataSelector(
- new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
- HandwrittenSelectorUtil.getRowDataSelector(
- new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
- .getProducedType());
- }
-
- private KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
- createHarnessWithoutUpsertKey(ConflictBehavior behavior) throws
Exception {
- RowType keyType = RowType.of(new LogicalType[]
{LOGICAL_TYPES[PRIMARY_KEY_INDEX]});
- WatermarkCompactingSinkMaterializer operator =
- WatermarkCompactingSinkMaterializer.create(
- toStrategy(behavior),
- RowType.of(LOGICAL_TYPES),
- RECORD_EQUALISER,
- null, // no upsert key equaliser
- null, // no upsert key
- keyType,
- new String[] {PRIMARY_KEY_NAME});
-
- return new KeyedOneInputStreamOperatorTestHarness<>(
- operator,
- HandwrittenSelectorUtil.getRowDataSelector(
- new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES),
- HandwrittenSelectorUtil.getRowDataSelector(
- new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES)
- .getProducedType());
- }
-
- private static InsertConflictStrategy toStrategy(ConflictBehavior
behavior) {
- switch (behavior) {
- case ERROR:
- return InsertConflictStrategy.error();
- case NOTHING:
- return InsertConflictStrategy.nothing();
- case DEDUPLICATE:
- return InsertConflictStrategy.deduplicate();
- default:
- throw new IllegalArgumentException("Unknown behavior: " +
behavior);
- }
- }
-
- private void assertEmitsNothing(
- KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness) {
- assertThat(extractRecords(harness)).isEmpty();
- }
-
- private void assertEmits(
- KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness,
- RowData... expected) {
- List<RowData> emitted = extractRecords(harness);
- assertThat(emitted).containsExactly(expected);
- }
-
- private List<RowData> extractRecords(
- KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>
harness) {
- final RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[LOGICAL_TYPES.length];
- for (int i = 0; i < LOGICAL_TYPES.length; i++) {
- fieldGetters[i] = RowData.createFieldGetter(LOGICAL_TYPES[i], i);
- }
-
- final List<RowData> rows = new ArrayList<>();
- Object o;
- while ((o = harness.getOutput().poll()) != null) {
- // Skip watermarks, only process StreamRecords
- if (o instanceof StreamRecord) {
- RowData value = (RowData) ((StreamRecord<?>) o).getValue();
- Object[] row = new Object[LOGICAL_TYPES.length];
- for (int i = 0; i < LOGICAL_TYPES.length; i++) {
- row[i] = fieldGetters[i].getFieldOrNull(value);
- }
- GenericRowData newRow = GenericRowData.of(row);
- newRow.setRowKind(value.getRowKind());
- rows.add(newRow);
- }
- }
- return rows;
- }
-
- /** Test equaliser that compares all fields. */
- private static class TestRecordEqualiser implements RecordEqualiser {
- @Override
- public boolean equals(RowData row1, RowData row2) {
- return row1.getRowKind() == row2.getRowKind()
- && row1.getLong(0) == row2.getLong(0)
- && row1.getInt(1) == row2.getInt(1)
- && Objects.equals(row1.getString(2), row2.getString(2));
- }
- }
-
- /** Test equaliser that only compares the upsert key (first column). */
- private static class TestUpsertKeyEqualiser implements RecordEqualiser {
- @Override
- public boolean equals(RowData row1, RowData row2) {
- return row1.getLong(0) == row2.getLong(0);
- }
- }
-}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
index 99a5deb41e9..40929ed6ac6 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java
@@ -104,26 +104,6 @@ public class StreamRecordUtils {
return new StreamRecord<>(row);
}
- /** Creates a new {@link RowData} with INSERT kind. Alias for {@link
#row(Object...)}. */
- public static RowData insert(Object... fields) {
- return rowOfKind(RowKind.INSERT, fields);
- }
-
- /** Creates a new {@link RowData} with UPDATE_BEFORE kind. */
- public static RowData updateBefore(Object... fields) {
- return rowOfKind(RowKind.UPDATE_BEFORE, fields);
- }
-
- /** Creates a new {@link RowData} with UPDATE_AFTER kind. */
- public static RowData updateAfter(Object... fields) {
- return rowOfKind(RowKind.UPDATE_AFTER, fields);
- }
-
- /** Creates a new {@link RowData} with DELETE kind. */
- public static RowData delete(Object... fields) {
- return rowOfKind(RowKind.DELETE, fields);
- }
-
/** Receives a object array, generates a RowData based on the array. */
public static RowData rowOfKind(RowKind rowKind, Object... fields) {
Object[] objects = new Object[fields.length];
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
index 9122d74cb79..976cef6349b 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java
@@ -73,7 +73,6 @@ import org.apache.flink.table.dataview.ListViewSerializer;
import org.apache.flink.table.dataview.MapViewSerializer;
import org.apache.flink.table.dataview.NullAwareMapSerializer;
import org.apache.flink.table.dataview.NullSerializer;
-import org.apache.flink.table.runtime.operators.sink.SortedLongSerializer;
import org.apache.flink.table.runtime.operators.window.CountWindow;
import
org.apache.flink.table.runtime.sequencedmultisetstate.linked.MetaSqnInfoSerializer;
import
org.apache.flink.table.runtime.sequencedmultisetstate.linked.NodeSerializer;
@@ -264,8 +263,7 @@ public class TypeSerializerTestCoverageTest extends
TestLogger {
NodeSerializer.class.getName(),
RowSqnInfoSerializer.class.getName(),
MetaSqnInfoSerializer.class.getName(),
- SetSerializer.class.getName(),
- SortedLongSerializer.class.getName());
+ SetSerializer.class.getName());
// check if a test exists for each type serializer
for (Class<? extends TypeSerializer> typeSerializer : typeSerializers)
{