This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new b2b78b274f [1.10.x] Flink: Fix non-deterministic operator UIDs in 
DynamicIceberg Sink (#15687) (#15738)
b2b78b274f is described below

commit b2b78b274fd8251cd1e61e0006159898b553a6b3
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Mar 24 16:48:13 2026 +0100

    [1.10.x] Flink: Fix non-deterministic operator UIDs in DynamicIceberg Sink 
(#15687) (#15738)
    
    * Flink: Backport: Fix non-deterministic operator UIDs in 
DynamicIcebergSink (#15687)
    
    * fixup! Fix merge conflict
---
 .../flink/sink/dynamic/DynamicIcebergSink.java     |  2 +-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
 .../flink/sink/dynamic/DynamicIcebergSink.java     |  2 +-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
 .../flink/sink/dynamic/DynamicIcebergSink.java     |  2 +-
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
 6 files changed, 192 insertions(+), 3 deletions(-)

diff --git 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e90fe4d6b1..f3b4a41917 100644
--- 
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -169,7 +169,7 @@ public class DynamicIcebergSink
             prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
             typeInformation,
             new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
-        .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+        .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
   }
 
   @Override
diff --git 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..b221719815 100644
--- 
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -40,7 +40,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -570,6 +572,67 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
+  @Test
+  void testOperatorUidsAreDeterministic() {
+    
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+    assertThat(createSinkAndReturnUIds("test"))
+        .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+  }
+
+  @Test
+  void testOperatorUidsFormat() {
+    Set<String> sinkUids = createSinkAndReturnUIds("test");
+    // These look odd, but we need to keep the UIDs consistent. We had a bug 
where the UID of the
+    // pre commit topology was off, but since it is stateless, users will 
still be able to restore
+    // state, but we must keep the stateful operators UUIds like the committer 
consistent.
+    assertThat(sinkUids)
+        .contains(
+            "test--sink",
+            "test--generator",
+            "test--updater",
+            "test--sink: test--pre-commit-topology",
+            "Sink Committer: test--sink");
+
+    sinkUids = createSinkAndReturnUIds("");
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+
+    sinkUids = createSinkAndReturnUIds(null);
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+  }
+
+  private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+    StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStreamSource<DynamicIcebergDataImpl> source =
+        streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new 
TypeHint<>() {}));
+    source.uid("source");
+
+    DynamicIcebergSink.forInput(source)
+        .generator(new Generator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .uidPrefix(uidPrefix)
+        .append();
+
+    // Make sure to get the expanded graph with all sink nodes
+    return streamEnv.getStreamGraph().getStreamNodes().stream()
+        .map(StreamNode::getTransformationUID)
+        // We are not interested in the source
+        .filter(uid -> !uid.equals("source"))
+        .collect(Collectors.toSet());
+  }
+
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e90fe4d6b1..f3b4a41917 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -169,7 +169,7 @@ public class DynamicIcebergSink
             prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
             typeInformation,
             new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
-        .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+        .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
   }
 
   @Override
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..b221719815 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -40,7 +40,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -570,6 +572,67 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
+  @Test
+  void testOperatorUidsAreDeterministic() {
+    
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+    assertThat(createSinkAndReturnUIds("test"))
+        .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+  }
+
+  @Test
+  void testOperatorUidsFormat() {
+    Set<String> sinkUids = createSinkAndReturnUIds("test");
+    // These look odd, but we need to keep the UIDs consistent. We had a bug 
where the UID of the
+    // pre commit topology was off, but since it is stateless, users will 
still be able to restore
+    // state, but we must keep the stateful operators UUIds like the committer 
consistent.
+    assertThat(sinkUids)
+        .contains(
+            "test--sink",
+            "test--generator",
+            "test--updater",
+            "test--sink: test--pre-commit-topology",
+            "Sink Committer: test--sink");
+
+    sinkUids = createSinkAndReturnUIds("");
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+
+    sinkUids = createSinkAndReturnUIds(null);
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+  }
+
+  private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+    StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStreamSource<DynamicIcebergDataImpl> source =
+        streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new 
TypeHint<>() {}));
+    source.uid("source");
+
+    DynamicIcebergSink.forInput(source)
+        .generator(new Generator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .uidPrefix(uidPrefix)
+        .append();
+
+    // Make sure to get the expanded graph with all sink nodes
+    return streamEnv.getStreamGraph().getStreamNodes().stream()
+        .map(StreamNode::getTransformationUID)
+        // We are not interested in the source
+        .filter(uid -> !uid.equals("source"))
+        .collect(Collectors.toSet());
+  }
+
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index db48be7977..3a31e5b642 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -169,7 +169,7 @@ public class DynamicIcebergSink
             prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
             typeInformation,
             new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
-        .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+        .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
   }
 
   @Override
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..b221719815 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -40,7 +40,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -570,6 +572,67 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     executeDynamicSink(rows, env, true, 1, commitHook);
   }
 
+  @Test
+  void testOperatorUidsAreDeterministic() {
+    
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+    assertThat(createSinkAndReturnUIds("test"))
+        .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+  }
+
+  @Test
+  void testOperatorUidsFormat() {
+    Set<String> sinkUids = createSinkAndReturnUIds("test");
+    // These look odd, but we need to keep the UIDs consistent. We had a bug 
where the UID of the
+    // pre commit topology was off, but since it is stateless, users will 
still be able to restore
+    // state, but we must keep the stateful operators UUIds like the committer 
consistent.
+    assertThat(sinkUids)
+        .contains(
+            "test--sink",
+            "test--generator",
+            "test--updater",
+            "test--sink: test--pre-commit-topology",
+            "Sink Committer: test--sink");
+
+    sinkUids = createSinkAndReturnUIds("");
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+
+    sinkUids = createSinkAndReturnUIds(null);
+    assertThat(sinkUids)
+        .contains(
+            "--sink",
+            "--generator",
+            "--updater",
+            "--sink: --pre-commit-topology",
+            "Sink Committer: --sink");
+  }
+
+  private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+    StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+    DataStreamSource<DynamicIcebergDataImpl> source =
+        streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new 
TypeHint<>() {}));
+    source.uid("source");
+
+    DynamicIcebergSink.forInput(source)
+        .generator(new Generator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .uidPrefix(uidPrefix)
+        .append();
+
+    // Make sure to get the expanded graph with all sink nodes
+    return streamEnv.getStreamGraph().getStreamNodes().stream()
+        .map(StreamNode::getTransformationUID)
+        // We are not interested in the source
+        .filter(uid -> !uid.equals("source"))
+        .collect(Collectors.toSet());
+  }
+
   private static class AppendRightBeforeCommit implements CommitHook {
 
     final String tableIdentifier;

Reply via email to