This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e5428cdc4 Flink: Backport: Fix non-deterministic operator UIDs in 
DynamicIcebergSink (#15687) (#15702)
0e5428cdc4 is described below

commit 0e5428cdc41d591f84164073a25f54c2dd5df9b6
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Mar 20 22:30:17 2026 +0100

    Flink: Backport: Fix non-deterministic operator UIDs in DynamicIcebergSink 
(#15687) (#15702)
---
 .../flink/sink/dynamic/DynamicIcebergSink.java     |  2 +-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
 .../flink/sink/dynamic/DynamicIcebergSink.java     |  2 +-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
 4 files changed, 128 insertions(+), 2 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 16551cc96d..218fa2d911 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -160,7 +160,7 @@ public class DynamicIcebergSink
             prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
             typeInformation,
             new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
-        .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+        .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
   }
 
   @Override
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index ad4a13d561..27b1e3d84a 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -45,7 +45,9 @@ import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -1154,6 +1156,67 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     assertThat(resultSchema.findField("DATA")).isNotNull();
   }
 
+  @Test
+  void testOperatorUidsAreDeterministic() {
+    
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+    assertThat(createSinkAndReturnUIds("test"))
+        .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+  }
+
+  @Test
+  void testOperatorUidsFormat() {
+    Set<String> sinkUids = createSinkAndReturnUIds("test");
+    // These look odd, but we need to keep the UIDs consistent. We had a bug 
where the UID of the
+    // pre commit topology was off, but since it is stateless, users will 
still be able to restore
+    // state, but we must keep the stateful operators UUIds like the committer 
consistent.
+    assertThat(sinkUids)
+        .contains(
+            "test--sink",
+            "test--generator",
+            "test--updater",
+            "test--sink: test--pre-commit-topology",
+            "Sink Committer: test--sink");
+
+    sinkUids = createSinkAndReturnUIds("");
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+
+    sinkUids = createSinkAndReturnUIds(null);
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+  }
+
+  private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+    StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStreamSource<DynamicIcebergDataImpl> source =
+        streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new 
TypeHint<>() {}));
+    source.uid("source");
+
+    DynamicIcebergSink.forInput(source)
+        .generator(new Generator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .uidPrefix(uidPrefix)
+        .append();
+
+    // Make sure to get the expanded graph with all sink nodes
+    return streamEnv.getStreamGraph().getStreamNodes().stream()
+        .map(StreamNode::getTransformationUID)
+        // We are not interested in the source
+        .filter(uid -> !uid.equals("source"))
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Represents a concurrent duplicate commit during an ongoing commit 
operation, which can happen
    * in production scenarios when using REST catalog.
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index afafbe5b59..4b5c9bef41 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -159,7 +159,7 @@ public class DynamicIcebergSink
             prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
             typeInformation,
             new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
-        .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+        .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
   }
 
   @Override
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index ad4a13d561..27b1e3d84a 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -45,7 +45,9 @@ import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -1154,6 +1156,67 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     assertThat(resultSchema.findField("DATA")).isNotNull();
   }
 
+  @Test
+  void testOperatorUidsAreDeterministic() {
+    
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+    assertThat(createSinkAndReturnUIds("test"))
+        .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+  }
+
+  @Test
+  void testOperatorUidsFormat() {
+    Set<String> sinkUids = createSinkAndReturnUIds("test");
+    // These look odd, but we need to keep the UIDs consistent. We had a bug 
where the UID of the
+    // pre commit topology was off, but since it is stateless, users will 
still be able to restore
+    // state, but we must keep the stateful operators UUIds like the committer 
consistent.
+    assertThat(sinkUids)
+        .contains(
+            "test--sink",
+            "test--generator",
+            "test--updater",
+            "test--sink: test--pre-commit-topology",
+            "Sink Committer: test--sink");
+
+    sinkUids = createSinkAndReturnUIds("");
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+
+    sinkUids = createSinkAndReturnUIds(null);
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+  }
+
+  private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+    StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStreamSource<DynamicIcebergDataImpl> source =
+        streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new 
TypeHint<>() {}));
+    source.uid("source");
+
+    DynamicIcebergSink.forInput(source)
+        .generator(new Generator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .uidPrefix(uidPrefix)
+        .append();
+
+    // Make sure to get the expanded graph with all sink nodes
+    return streamEnv.getStreamGraph().getStreamNodes().stream()
+        .map(StreamNode::getTransformationUID)
+        // We are not interested in the source
+        .filter(uid -> !uid.equals("source"))
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Represents a concurrent duplicate commit during an ongoing commit 
operation, which can happen
    * in production scenarios when using REST catalog.

Reply via email to