This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new b2b78b274f [1.10.x] Flink: Fix non-deterministic operator UIDs in
DynamicIceberg Sink (#15687) (#15738)
b2b78b274f is described below
commit b2b78b274fd8251cd1e61e0006159898b553a6b3
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Mar 24 16:48:13 2026 +0100
[1.10.x] Flink: Fix non-deterministic operator UIDs in DynamicIceberg Sink
(#15687) (#15738)
* Flink: Backport: Fix non-deterministic operator UIDs in
DynamicIcebergSink (#15687)
* fixup! Fix merge conflict
---
.../flink/sink/dynamic/DynamicIcebergSink.java | 2 +-
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
.../flink/sink/dynamic/DynamicIcebergSink.java | 2 +-
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
.../flink/sink/dynamic/DynamicIcebergSink.java | 2 +-
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
6 files changed, 192 insertions(+), 3 deletions(-)
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e90fe4d6b1..f3b4a41917 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -169,7 +169,7 @@ public class DynamicIcebergSink
prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
typeInformation,
new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
- .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+ .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
}
@Override
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..b221719815 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -40,7 +40,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -570,6 +572,67 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
+ @Test
+ void testOperatorUidsAreDeterministic() {
+
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+ assertThat(createSinkAndReturnUIds("test"))
+ .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+ }
+
+ @Test
+ void testOperatorUidsFormat() {
+ Set<String> sinkUids = createSinkAndReturnUIds("test");
+ // These look odd, but we need to keep the UIDs consistent. We had a bug
where the UID of the
+ // pre commit topology was off, but since it is stateless, users will
still be able to restore
+ // state, but we must keep the stateful operators UUIds like the committer
consistent.
+ assertThat(sinkUids)
+ .contains(
+ "test--sink",
+ "test--generator",
+ "test--updater",
+ "test--sink: test--pre-commit-topology",
+ "Sink Committer: test--sink");
+
+ sinkUids = createSinkAndReturnUIds("");
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+
+ sinkUids = createSinkAndReturnUIds(null);
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+ }
+
+ private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+ StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<DynamicIcebergDataImpl> source =
+ streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new
TypeHint<>() {}));
+ source.uid("source");
+
+ DynamicIcebergSink.forInput(source)
+ .generator(new Generator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .uidPrefix(uidPrefix)
+ .append();
+
+ // Make sure to get the expanded graph with all sink nodes
+ return streamEnv.getStreamGraph().getStreamNodes().stream()
+ .map(StreamNode::getTransformationUID)
+ // We are not interested in the source
+ .filter(uid -> !uid.equals("source"))
+ .collect(Collectors.toSet());
+ }
+
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e90fe4d6b1..f3b4a41917 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -169,7 +169,7 @@ public class DynamicIcebergSink
prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
typeInformation,
new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
- .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+ .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
}
@Override
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..b221719815 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -40,7 +40,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -570,6 +572,67 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
+ @Test
+ void testOperatorUidsAreDeterministic() {
+
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+ assertThat(createSinkAndReturnUIds("test"))
+ .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+ }
+
+ @Test
+ void testOperatorUidsFormat() {
+ Set<String> sinkUids = createSinkAndReturnUIds("test");
+ // These look odd, but we need to keep the UIDs consistent. We had a bug
where the UID of the
+ // pre commit topology was off, but since it is stateless, users will
still be able to restore
+ // state, but we must keep the stateful operators UUIds like the committer
consistent.
+ assertThat(sinkUids)
+ .contains(
+ "test--sink",
+ "test--generator",
+ "test--updater",
+ "test--sink: test--pre-commit-topology",
+ "Sink Committer: test--sink");
+
+ sinkUids = createSinkAndReturnUIds("");
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+
+ sinkUids = createSinkAndReturnUIds(null);
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+ }
+
+ private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+ StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<DynamicIcebergDataImpl> source =
+ streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new
TypeHint<>() {}));
+ source.uid("source");
+
+ DynamicIcebergSink.forInput(source)
+ .generator(new Generator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .uidPrefix(uidPrefix)
+ .append();
+
+ // Make sure to get the expanded graph with all sink nodes
+ return streamEnv.getStreamGraph().getStreamNodes().stream()
+ .map(StreamNode::getTransformationUID)
+ // We are not interested in the source
+ .filter(uid -> !uid.equals("source"))
+ .collect(Collectors.toSet());
+ }
+
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index db48be7977..3a31e5b642 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -169,7 +169,7 @@ public class DynamicIcebergSink
prefixIfNotNull(uidPrefix, sinkId + " Pre Commit"),
typeInformation,
new DynamicWriteResultAggregator(catalogLoader, cacheMaximumSize))
- .uid(prefixIfNotNull(uidPrefix, sinkId + "-pre-commit-topology"));
+ .uid(prefixIfNotNull(uidPrefix, "-pre-commit-topology"));
}
@Override
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 20fae212b4..b221719815 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -40,7 +40,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
@@ -570,6 +572,67 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
+ @Test
+ void testOperatorUidsAreDeterministic() {
+
assertThat(createSinkAndReturnUIds("test")).isEqualTo(createSinkAndReturnUIds("test"));
+ assertThat(createSinkAndReturnUIds("test"))
+ .doesNotContainAnyElementsOf(createSinkAndReturnUIds("test2"));
+ }
+
+ @Test
+ void testOperatorUidsFormat() {
+ Set<String> sinkUids = createSinkAndReturnUIds("test");
+ // These look odd, but we need to keep the UIDs consistent. We had a bug
where the UID of the
+ // pre commit topology was off, but since it is stateless, users will
still be able to restore
+ // state, but we must keep the stateful operators UUIds like the committer
consistent.
+ assertThat(sinkUids)
+ .contains(
+ "test--sink",
+ "test--generator",
+ "test--updater",
+ "test--sink: test--pre-commit-topology",
+ "Sink Committer: test--sink");
+
+ sinkUids = createSinkAndReturnUIds("");
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+
+ sinkUids = createSinkAndReturnUIds(null);
+ assertThat(sinkUids)
+ .contains(
+ "--sink",
+ "--generator",
+ "--updater",
+ "--sink: --pre-commit-topology",
+ "Sink Committer: --sink");
+ }
+
+ private Set<String> createSinkAndReturnUIds(String uidPrefix) {
+ StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStreamSource<DynamicIcebergDataImpl> source =
+ streamEnv.fromData(Collections.emptySet(), TypeInformation.of(new
TypeHint<>() {}));
+ source.uid("source");
+
+ DynamicIcebergSink.forInput(source)
+ .generator(new Generator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .uidPrefix(uidPrefix)
+ .append();
+
+ // Make sure to get the expanded graph with all sink nodes
+ return streamEnv.getStreamGraph().getStreamNodes().stream()
+ .map(StreamNode::getTransformationUID)
+ // We are not interested in the source
+ .filter(uid -> !uid.equals("source"))
+ .collect(Collectors.toSet());
+ }
+
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;