This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 657f4181968 [FLINK-39155] Validate watermarks are available when using 
ON CONFLICT DO ERROR/NOTHING (#27687)
657f4181968 is described below

commit 657f4181968a1c2017dd3c91842ccf7c80241ff7
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 26 16:43:54 2026 +0100

    [FLINK-39155] Validate watermarks are available when using ON CONFLICT DO 
ERROR/NOTHING (#27687)
---
 .../FlinkChangelogModeInferenceProgram.scala       | 40 +++++++++++
 .../plan/nodes/exec/stream/SinkSemanticTests.java  |  4 +-
 .../plan/nodes/exec/stream/SinkTestPrograms.java   | 84 ++++++++++++++++++++--
 .../planner/plan/stream/sql/TableSinkTest.xml      | 32 ---------
 4 files changed, 120 insertions(+), 40 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 9d1bb23cb98..c660f00ad5a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.optimize.program
 import org.apache.flink.legacy.table.sinks.{AppendStreamTableSink, 
RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink}
 import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.api.InsertConflictStrategy
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
 import org.apache.flink.table.connector.ChangelogMode
@@ -32,6 +33,7 @@ import 
org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterO
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
 import 
org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver
+import org.apache.flink.table.planner.plan.schema.TableSourceTable
 import org.apache.flink.table.planner.plan.utils._
 import 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy,
 RetractStrategy, UpdateFastStrategy}
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
@@ -1072,6 +1074,15 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         }
       }
 
+      // Validate that sources have watermarks when using ERROR or NOTHING 
strategy
+      if (
+        sink.conflictStrategy != null &&
+        (sink.conflictStrategy.getBehavior == ConflictBehavior.ERROR ||
+          sink.conflictStrategy.getBehavior == ConflictBehavior.NOTHING)
+      ) {
+        validateSourcesHaveWatermarks(sink)
+      }
+
       
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) 
match {
         case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
         case UpsertMaterialize.NONE => false
@@ -1116,6 +1127,35 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           upsertKeyDiffersFromPk
       }
     }
+
+    private def validateSourcesHaveWatermarks(sink: StreamPhysicalSink): Unit 
= {
+      val sourcesWithoutWatermarks = new java.util.ArrayList[String]()
+      collectSourcesWithoutWatermarks(sink.getInput, sourcesWithoutWatermarks)
+      if (!sourcesWithoutWatermarks.isEmpty) {
+        throw new ValidationException(
+          s"ON CONFLICT DO ${sink.conflictStrategy.getBehavior} requires all 
source " +
+            s"tables to define watermarks, but the following source(s) do not: 
" +
+            s"${sourcesWithoutWatermarks.mkString(", ")}. " +
+            s"Please add a WATERMARK declaration to these tables.")
+      }
+    }
+
+    private def collectSourcesWithoutWatermarks(
+        rel: RelNode,
+        result: java.util.List[String]): Unit = {
+      rel match {
+        case ts: StreamPhysicalTableSourceScan =>
+          val table = ts.getTable.unwrap(classOf[TableSourceTable])
+          if (
+            table != null &&
+            
table.contextResolvedTable.getResolvedSchema.getWatermarkSpecs.isEmpty
+          ) {
+            
result.add(table.contextResolvedTable.getIdentifier.asSummaryString())
+          }
+        case _ =>
+          rel.getInputs.forEach(input => 
collectSourcesWithoutWatermarks(input, result))
+      }
+    }
   }
 
   /**
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
index 18f8d9e4b4a..9ee3f57a329 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java
@@ -67,6 +67,8 @@ public class SinkSemanticTests extends SemanticTestBase {
                 SinkTestPrograms.APPEND_ONLY_WITH_PK_WITH_ON_CONFLICT,
                 
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT_DISABLED,
                 SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_APPEND_ONLY_SINK,
-                SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_RETRACT_SINK);
+                SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_RETRACT_SINK,
+                SinkTestPrograms.ON_CONFLICT_DO_NOTHING_REQUIRES_WATERMARKS,
+                SinkTestPrograms.ON_CONFLICT_DO_ERROR_REQUIRES_WATERMARKS);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
index 83c2e44e333..42222a5dfe0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
+import java.time.LocalDateTime;
+
 /** Tests for verifying sink semantics. */
 public class SinkTestPrograms {
 
@@ -97,12 +99,28 @@ public class SinkTestPrograms {
                             "ON CONFLICT DO NOTHING keeps the first record 
when multiple records have the same PK.")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
-                                    .addSchema("a INT", "b BIGINT")
+                                    .addSchema(
+                                            "a INT",
+                                            "b BIGINT",
+                                            "ts TIMESTAMP(3)",
+                                            "WATERMARK FOR ts AS ts")
                                     .addOption("changelog-mode", "I")
                                     .producedValues(
-                                            Row.ofKind(RowKind.INSERT, 1, 10L),
-                                            Row.ofKind(RowKind.INSERT, 1, 20L),
-                                            Row.ofKind(RowKind.INSERT, 2, 30L))
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1,
+                                                    10L,
+                                                    LocalDateTime.of(2024, 1, 
1, 0, 0, 1)),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1,
+                                                    20L,
+                                                    LocalDateTime.of(2024, 1, 
1, 0, 0, 2)),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    2,
+                                                    30L,
+                                                    LocalDateTime.of(2024, 1, 
1, 0, 0, 3)))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink_t")
@@ -118,11 +136,23 @@ public class SinkTestPrograms {
                             "ON CONFLICT DO ERROR with no conflicts passes 
through all records.")
                     .setupTableSource(
                             SourceTestStep.newBuilder("source_t")
-                                    .addSchema("a INT", "b BIGINT")
+                                    .addSchema(
+                                            "a INT",
+                                            "b BIGINT",
+                                            "ts TIMESTAMP(3)",
+                                            "WATERMARK FOR ts AS ts")
                                     .addOption("changelog-mode", "I")
                                     .producedValues(
-                                            Row.ofKind(RowKind.INSERT, 1, 10L),
-                                            Row.ofKind(RowKind.INSERT, 2, 20L))
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1,
+                                                    10L,
+                                                    LocalDateTime.of(2024, 1, 
1, 0, 0, 1)),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    2,
+                                                    20L,
+                                                    LocalDateTime.of(2024, 1, 
1, 0, 0, 2)))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink_t")
@@ -328,4 +358,44 @@ public class SinkTestPrograms {
                                     + "The sink 
'default_catalog.default_database.sink_t' is not an upsert sink "
                                     + "because it requires UPDATE_BEFORE 
(retract mode).")
                     .build();
+
+    public static final TableTestProgram 
ON_CONFLICT_DO_NOTHING_REQUIRES_WATERMARKS =
+            TableTestProgram.of(
+                            "sink-on-conflict-do-nothing-requires-watermarks",
+                            "ON CONFLICT DO NOTHING requires sources to have 
watermarks.")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT", "b BIGINT")
+                                    .addOption("changelog-mode", "I")
+                                    .producedValues(Row.ofKind(RowKind.INSERT, 
1, 10L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT PRIMARY KEY NOT 
ENFORCED", "b BIGINT")
+                                    .build())
+                    .runFailingSql(
+                            "INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO NOTHING",
+                            ValidationException.class,
+                            "requires all source tables to define watermarks")
+                    .build();
+
+    public static final TableTestProgram 
ON_CONFLICT_DO_ERROR_REQUIRES_WATERMARKS =
+            TableTestProgram.of(
+                            "sink-on-conflict-do-error-requires-watermarks",
+                            "ON CONFLICT DO ERROR requires sources to have 
watermarks.")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT", "b BIGINT")
+                                    .addOption("changelog-mode", "I")
+                                    .producedValues(Row.ofKind(RowKind.INSERT, 
1, 10L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT PRIMARY KEY NOT 
ENFORCED", "b BIGINT")
+                                    .build())
+                    .runFailingSql(
+                            "INSERT INTO sink_t SELECT a, b FROM source_t ON 
CONFLICT DO ERROR",
+                            ValidationException.class,
+                            "requires all source tables to define watermarks")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
index 179e09c8ad3..1ae3035de31 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.xml
@@ -520,38 +520,6 @@ 
LogicalSink(table=[default_catalog.default_database.conflictSink3], fields=[a, b
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.conflictSink3], fields=[a, b], 
conflictStrategy=[DEDUPLICATE], changelogMode=[NONE])
-+- Calc(select=[a, b], changelogMode=[I])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testInsertOnConflictDoError">
-    <Resource name="ast">
-      <![CDATA[
-LogicalSink(table=[default_catalog.default_database.conflictSink2], fields=[a, 
b])
-+- LogicalProject(a=[$0], b=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Sink(table=[default_catalog.default_database.conflictSink2], fields=[a, b], 
conflictStrategy=[ERROR], changelogMode=[NONE])
-+- Calc(select=[a, b], changelogMode=[I])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testInsertOnConflictDoNothing">
-    <Resource name="ast">
-      <![CDATA[
-LogicalSink(table=[default_catalog.default_database.conflictSink], fields=[a, 
b])
-+- LogicalProject(a=[$0], b=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Sink(table=[default_catalog.default_database.conflictSink], fields=[a, b], 
conflictStrategy=[NOTHING], changelogMode=[NONE])
 +- Calc(select=[a, b], changelogMode=[I])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], changelogMode=[I])
 ]]>

Reply via email to