This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 657f4181968 [FLINK-39155] Validate watermarks are available when using
ON CONFLICT DO ERROR/NOTHING (#27687)
657f4181968 is described below
commit 657f4181968a1c2017dd3c91842ccf7c80241ff7
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 26 16:43:54 2026 +0100
[FLINK-39155] Validate watermarks are available when using ON CONFLICT DO
ERROR/NOTHING (#27687)
---
.../FlinkChangelogModeInferenceProgram.scala | 40 +++++++++++
.../plan/nodes/exec/stream/SinkSemanticTests.java | 4 +-
.../plan/nodes/exec/stream/SinkTestPrograms.java | 84 ++++++++++++++++++++--
.../planner/plan/stream/sql/TableSinkTest.xml | 32 ---------
4 files changed, 120 insertions(+), 40 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 9d1bb23cb98..c660f00ad5a 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.optimize.program
import org.apache.flink.legacy.table.sinks.{AppendStreamTableSink,
RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink}
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.api.InsertConflictStrategy
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
import org.apache.flink.table.api.config.ExecutionConfigOptions
import
org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.connector.ChangelogMode
@@ -32,6 +33,7 @@ import
org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterO
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import
org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver
+import org.apache.flink.table.planner.plan.schema.TableSourceTable
import org.apache.flink.table.planner.plan.utils._
import
org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy,
RetractStrategy, UpdateFastStrategy}
import org.apache.flink.table.planner.sinks.DataStreamTableSink
@@ -1072,6 +1074,15 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
}
}
+ // Validate that sources have watermarks when using ERROR or NOTHING
strategy
+ if (
+ sink.conflictStrategy != null &&
+ (sink.conflictStrategy.getBehavior == ConflictBehavior.ERROR ||
+ sink.conflictStrategy.getBehavior == ConflictBehavior.NOTHING)
+ ) {
+ validateSourcesHaveWatermarks(sink)
+ }
+
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE)
match {
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
case UpsertMaterialize.NONE => false
@@ -1116,6 +1127,35 @@ class FlinkChangelogModeInferenceProgram extends
FlinkOptimizeProgram[StreamOpti
upsertKeyDiffersFromPk
}
}
+
+ private def validateSourcesHaveWatermarks(sink: StreamPhysicalSink): Unit
= {
+ val sourcesWithoutWatermarks = new java.util.ArrayList[String]()
+ collectSourcesWithoutWatermarks(sink.getInput, sourcesWithoutWatermarks)
+ if (!sourcesWithoutWatermarks.isEmpty) {
+ throw new ValidationException(
+ s"ON CONFLICT DO ${sink.conflictStrategy.getBehavior} requires all
source " +
+ s"tables to define watermarks, but the following source(s) do not:
" +
+ s"${sourcesWithoutWatermarks.mkString(", ")}. " +
+ s"Please add a WATERMARK declaration to these tables.")
+ }
+ }
+
+ private def collectSourcesWithoutWatermarks(
+ rel: RelNode,
+ result: java.util.List[String]): Unit = {
+ rel match {
+ case ts: StreamPhysicalTableSourceScan =>
+ val table = ts.getTable.unwrap(classOf[TableSourceTable])
+ if (
+ table != null &&
+
table.contextResolvedTable.getResolvedSchema.getWatermarkSpecs.isEmpty
+ ) {
+
result.add(table.contextResolvedTable.getIdentifier.asSummaryString())
+ }
+ case _ =>
+ rel.getInputs.forEach(input =>
collectSourcesWithoutWatermarks(input, result))
+ }
+ }
}
/**
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
index 18f8d9e4b4a..9ee3f57a329 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
@@ -67,6 +67,8 @@ public class SinkSemanticTests extends SemanticTestBase {
SinkTestPrograms.APPEND_ONLY_WITH_PK_WITH_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT_DISABLED,
SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_APPEND_ONLY_SINK,
- SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_RETRACT_SINK);
+ SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_RETRACT_SINK,
+ SinkTestPrograms.ON_CONFLICT_DO_NOTHING_REQUIRES_WATERMARKS,
+ SinkTestPrograms.ON_CONFLICT_DO_ERROR_REQUIRES_WATERMARKS);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
index 83c2e44e333..42222a5dfe0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import java.time.LocalDateTime;
+
/** Tests for verifying sink semantics. */
public class SinkTestPrograms {
@@ -97,12 +99,28 @@ public class SinkTestPrograms {
"ON CONFLICT DO NOTHING keeps the first record
when multiple records have the same PK.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
- .addSchema("a INT", "b BIGINT")
+ .addSchema(
+ "a INT",
+ "b BIGINT",
+ "ts TIMESTAMP(3)",
+ "WATERMARK FOR ts AS ts")
.addOption("changelog-mode", "I")
.producedValues(
- Row.ofKind(RowKind.INSERT, 1, 10L),
- Row.ofKind(RowKind.INSERT, 1, 20L),
- Row.ofKind(RowKind.INSERT, 2, 30L))
+ Row.ofKind(
+ RowKind.INSERT,
+ 1,
+ 10L,
+ LocalDateTime.of(2024, 1,
1, 0, 0, 1)),
+ Row.ofKind(
+ RowKind.INSERT,
+ 1,
+ 20L,
+ LocalDateTime.of(2024, 1,
1, 0, 0, 2)),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2,
+ 30L,
+ LocalDateTime.of(2024, 1,
1, 0, 0, 3)))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
@@ -118,11 +136,23 @@ public class SinkTestPrograms {
"ON CONFLICT DO ERROR with no conflicts passes
through all records.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
- .addSchema("a INT", "b BIGINT")
+ .addSchema(
+ "a INT",
+ "b BIGINT",
+ "ts TIMESTAMP(3)",
+ "WATERMARK FOR ts AS ts")
.addOption("changelog-mode", "I")
.producedValues(
- Row.ofKind(RowKind.INSERT, 1, 10L),
- Row.ofKind(RowKind.INSERT, 2, 20L))
+ Row.ofKind(
+ RowKind.INSERT,
+ 1,
+ 10L,
+ LocalDateTime.of(2024, 1,
1, 0, 0, 1)),
+ Row.ofKind(
+ RowKind.INSERT,
+ 2,
+ 20L,
+ LocalDateTime.of(2024, 1,
1, 0, 0, 2)))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
@@ -328,4 +358,44 @@ public class SinkTestPrograms {
+ "The sink
'default_catalog.default_database.sink_t' is not an upsert sink "
+ "because it requires UPDATE_BEFORE
(retract mode).")
.build();
+
+ public static final TableTestProgram
ON_CONFLICT_DO_NOTHING_REQUIRES_WATERMARKS =
+ TableTestProgram.of(
+ "sink-on-conflict-do-nothing-requires-watermarks",
+ "ON CONFLICT DO NOTHING requires sources to have
watermarks.")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT", "b BIGINT")
+ .addOption("changelog-mode", "I")
+ .producedValues(Row.ofKind(RowKind.INSERT,
1, 10L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT PRIMARY KEY NOT
ENFORCED", "b BIGINT")
+ .build())
+ .runFailingSql(
+ "INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO NOTHING",
+ ValidationException.class,
+ "requires all source tables to define watermarks")
+ .build();
+
+ public static final TableTestProgram
ON_CONFLICT_DO_ERROR_REQUIRES_WATERMARKS =
+ TableTestProgram.of(
+ "sink-on-conflict-do-error-requires-watermarks",
+ "ON CONFLICT DO ERROR requires sources to have
watermarks.")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT", "b BIGINT")
+ .addOption("changelog-mode", "I")
+ .producedValues(Row.ofKind(RowKind.INSERT,
1, 10L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT PRIMARY KEY NOT
ENFORCED", "b BIGINT")
+ .build())
+ .runFailingSql(
+ "INSERT INTO sink_t SELECT a, b FROM source_t ON
CONFLICT DO ERROR",
+ ValidationException.class,
+ "requires all source tables to define watermarks")
+ .build();
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 179e09c8ad3..1ae3035de31 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -520,38 +520,6 @@
LogicalSink(table=[default_catalog.default_database.conflictSink3], fields=[a, b
<Resource name="optimized rel plan">
<![CDATA[
Sink(table=[default_catalog.default_database.conflictSink3], fields=[a, b],
conflictStrategy=[DEDUPLICATE], changelogMode=[NONE])
-+- Calc(select=[a, b], changelogMode=[I])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], changelogMode=[I])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testInsertOnConflictDoError">
- <Resource name="ast">
- <![CDATA[
-LogicalSink(table=[default_catalog.default_database.conflictSink2], fields=[a,
b])
-+- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Sink(table=[default_catalog.default_database.conflictSink2], fields=[a, b],
conflictStrategy=[ERROR], changelogMode=[NONE])
-+- Calc(select=[a, b], changelogMode=[I])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], changelogMode=[I])
-]]>
- </Resource>
- </TestCase>
- <TestCase name="testInsertOnConflictDoNothing">
- <Resource name="ast">
- <![CDATA[
-LogicalSink(table=[default_catalog.default_database.conflictSink], fields=[a,
b])
-+- LogicalProject(a=[$0], b=[$1])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Sink(table=[default_catalog.default_database.conflictSink], fields=[a, b],
conflictStrategy=[NOTHING], changelogMode=[NONE])
+- Calc(select=[a, b], changelogMode=[I])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c], changelogMode=[I])
]]>