This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 84be38acc5 Flink: Backport: Add test to ensure that append commits are
created in dynamic iceberg sink when possible (#15088)
84be38acc5 is described below
commit 84be38acc59bb94cf441180ab7496bd828167093
Author: pvary <[email protected]>
AuthorDate: Tue Jan 20 10:16:25 2026 +0100
Flink: Backport: Add test to ensure that append commits are created in
dynamic iceberg sink when possible (#15088)
---
.../flink/sink/dynamic/TestDynamicCommitter.java | 53 ++++++++++++++++++++++
.../flink/sink/dynamic/TestDynamicCommitter.java | 53 ++++++++++++++++++++++
2 files changed, 106 insertions(+)
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 24832c48be..4cc27151b0 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -696,6 +696,59 @@ class TestDynamicCommitter {
.build());
}
+ @Test
+ void testCommitDeltaTxnWithAppendFiles() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ TableKey tableKey = new TableKey(TABLE1, "branch1");
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 1;
+
+ byte[][] deltaManifest1 =
+ aggregator.writeToManifests(tableKey.tableName(),
WRITE_RESULT_BY_SPEC, checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(tableKey, deltaManifest1, jobId,
operatorId, checkpointId));
+
+ byte[][] deltaManifest2 =
+ aggregator.writeToManifests(tableKey.tableName(),
WRITE_RESULT_BY_SPEC_2, checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(tableKey, deltaManifest2, jobId,
operatorId, checkpointId));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2));
+
+ table.refresh();
+ assertThat(table.snapshots()).hasSize(1);
+
+ Snapshot snapshot = Iterables.getFirst(table.snapshots(), null);
+ assertThat(snapshot.operation()).isEqualTo("append");
+ }
+
@Test
void testReplacePartitions() throws Exception {
Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 24832c48be..4cc27151b0 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -696,6 +696,59 @@ class TestDynamicCommitter {
.build());
}
+ @Test
+ void testCommitDeltaTxnWithAppendFiles() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ TableKey tableKey = new TableKey(TABLE1, "branch1");
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 1;
+
+ byte[][] deltaManifest1 =
+ aggregator.writeToManifests(tableKey.tableName(),
WRITE_RESULT_BY_SPEC, checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(tableKey, deltaManifest1, jobId,
operatorId, checkpointId));
+
+ byte[][] deltaManifest2 =
+ aggregator.writeToManifests(tableKey.tableName(),
WRITE_RESULT_BY_SPEC_2, checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(tableKey, deltaManifest2, jobId,
operatorId, checkpointId));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2));
+
+ table.refresh();
+ assertThat(table.snapshots()).hasSize(1);
+
+ Snapshot snapshot = Iterables.getFirst(table.snapshots(), null);
+ assertThat(snapshot.operation()).isEqualTo("append");
+ }
+
@Test
void testReplacePartitions() throws Exception {
Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));