This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0e5428cdc4 Flink: Backport: Fix non-deterministic operator UIDs in
DynamicIcebergSink (#15687) (#15702)
0e5428cdc4 is described below
commit 0e5428cdc41d591f84164073a25f54c2dd5df9b6
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Mar 20 22:30:17 2026 +0100
Flink: Backport: Fix non-deterministic operator UIDs in DynamicIcebergSink
(#15687) (#15702)
---
.../flink/sink/dynamic/DynamicIcebergSink.java | 2 +-
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
.../flink/sink/dynamic/DynamicIcebergSink.java | 2 +-
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
4 files changed, 128 insertions(+), 2 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index 16551cc96d..218fa2d911 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -160,7 +160,7 @@ public class DynamicIcebergSink
prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
typeInformation,
new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
- .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+ .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
}
@Override
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index ad4a13d561..27b1e3d84a 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -45,7 +45,9 @@ import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -1154,6 +1156,67 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
assertThat(resultSchema.findField("DATA")).isNotNull();
}
+ @Test
+ void testOperatorUidsAreDeterministic() {
+
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+ assertThat(createSinkAndReturnUIds("test"))
+ .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+ }
+
+ @Test
+ void testOperatorUidsFormat() {
+ Set<String> sinkUids = createSinkAndReturnUIds("test");
+ // These look odd, but we need to keep the UIDs consistent. We had a bug
where the UID of the
+ // pre commit topology was off, but since it is stateless, users will
still be able to restore
+ // state, but we must keep the stateful operators UUIds like the committer
consistent.
+ assertThat(sinkUids)
+ .contains(
+ "test--sink",
+ "test--generator",
+ "test--updater",
+ "test--sink: test--pre-commit-topology",
+ "Sink Committer: test--sink");
+
+ sinkUids = createSinkAndReturnUIds("");
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+
+ sinkUids = createSinkAndReturnUIds(null);
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+ }
+
+ private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+ StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<DynamicIcebergDataImpl> source =
+ streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new
TypeHint<>() {}));
+ source.uid("source");
+
+ DynamicIcebergSink.forInput(source)
+ .generator(new Generator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .uidPrefix(uidPrefix)
+ .append();
+
+ // Make sure to get the expanded graph with all sink nodes
+ return streamEnv.getStreamGraph().getStreamNodes().stream()
+ .map(StreamNode::getTransformationUID)
+ // We are not interested in the source
+ .filter(uid -> !uid.equals("source"))
+ .collect(Collectors.toSet());
+ }
+
/**
* Represents a concurrent duplicate commit during an ongoing commit
operation, which can happen
* in production scenarios when using REST catalog.
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index afafbe5b59..4b5c9bef41 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -159,7 +159,7 @@ public class DynamicIcebergSink
prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
typeInformation,
new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
- .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+ .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
}
@Override
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index ad4a13d561..27b1e3d84a 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -45,7 +45,9 @@ import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -1154,6 +1156,67 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
assertThat(resultSchema.findField("DATA")).isNotNull();
}
+ @Test
+ void testOperatorUidsAreDeterministic() {
+
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+ assertThat(createSinkAndReturnUIds("test"))
+ .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+ }
+
+ @Test
+ void testOperatorUidsFormat() {
+ Set<String> sinkUids = createSinkAndReturnUIds("test");
+ // These look odd, but we need to keep the UIDs consistent. We had a bug
where the UID of the
+ // pre commit topology was off, but since it is stateless, users will
still be able to restore
+ // state, but we must keep the stateful operators UUIds like the committer
consistent.
+ assertThat(sinkUids)
+ .contains(
+ "test--sink",
+ "test--generator",
+ "test--updater",
+ "test--sink: test--pre-commit-topology",
+ "Sink Committer: test--sink");
+
+ sinkUids = createSinkAndReturnUIds("");
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+
+ sinkUids = createSinkAndReturnUIds(null);
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+ }
+
+ private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+ StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<DynamicIcebergDataImpl> source =
+ streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new
TypeHint<>() {}));
+ source.uid("source");
+
+ DynamicIcebergSink.forInput(source)
+ .generator(new Generator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .uidPrefix(uidPrefix)
+ .append();
+
+ // Make sure to get the expanded graph with all sink nodes
+ return streamEnv.getStreamGraph().getStreamNodes().stream()
+ .map(StreamNode::getTransformationUID)
+ // We are not interested in the source
+ .filter(uid -> !uid.equals("source"))
+ .collect(Collectors.toSet());
+ }
+
/**
* Represents a concurrent duplicate commit during an ongoing commit
operation, which can happen
* in production scenarios when using REST catalog.