This is an automated email from the ASF dual-hosted git repository.

singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 84be38acc5 Flink: Backport: Add test to ensure that append commits are 
created in dynamic iceberg sink when possible (#15088)
84be38acc5 is described below

commit 84be38acc59bb94cf441180ab7496bd828167093
Author: pvary <[email protected]>
AuthorDate: Tue Jan 20 10:16:25 2026 +0100

    Flink: Backport: Add test to ensure that append commits are created in 
dynamic iceberg sink when possible (#15088)
---
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 53 ++++++++++++++++++++++
 .../flink/sink/dynamic/TestDynamicCommitter.java   | 53 ++++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 24832c48be..4cc27151b0 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -696,6 +696,59 @@ class TestDynamicCommitter {
                 .build());
   }
 
+  @Test
+  void testCommitDeltaTxnWithAppendFiles() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    TableKey tableKey = new TableKey(TABLE1, "branch1");
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 1;
+
+    byte[][] deltaManifest1 =
+        aggregator.writeToManifests(tableKey.tableName(), 
WRITE_RESULT_BY_SPEC, checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(tableKey, deltaManifest1, jobId, 
operatorId, checkpointId));
+
+    byte[][] deltaManifest2 =
+        aggregator.writeToManifests(tableKey.tableName(), 
WRITE_RESULT_BY_SPEC_2, checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(tableKey, deltaManifest2, jobId, 
operatorId, checkpointId));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2));
+
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(1);
+
+    Snapshot snapshot = Iterables.getFirst(table.snapshots(), null);
+    assertThat(snapshot.operation()).isEqualTo("append");
+  }
+
   @Test
   void testReplacePartitions() throws Exception {
     Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 24832c48be..4cc27151b0 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -696,6 +696,59 @@ class TestDynamicCommitter {
                 .build());
   }
 
+  @Test
+  void testCommitDeltaTxnWithAppendFiles() throws Exception {
+    Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+    assertThat(table.snapshots()).isEmpty();
+
+    DynamicWriteResultAggregator aggregator =
+        new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(), 
cacheMaximumSize);
+    OneInputStreamOperatorTestHarness aggregatorHarness =
+        new OneInputStreamOperatorTestHarness(aggregator);
+    aggregatorHarness.open();
+
+    TableKey tableKey = new TableKey(TABLE1, "branch1");
+    final String jobId = JobID.generate().toHexString();
+    final String operatorId = new OperatorID().toHexString();
+    final int checkpointId = 1;
+
+    byte[][] deltaManifest1 =
+        aggregator.writeToManifests(tableKey.tableName(), 
WRITE_RESULT_BY_SPEC, checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest1 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(tableKey, deltaManifest1, jobId, 
operatorId, checkpointId));
+
+    byte[][] deltaManifest2 =
+        aggregator.writeToManifests(tableKey.tableName(), 
WRITE_RESULT_BY_SPEC_2, checkpointId);
+
+    CommitRequest<DynamicCommittable> commitRequest2 =
+        new MockCommitRequest<>(
+            new DynamicCommittable(tableKey, deltaManifest2, jobId, 
operatorId, checkpointId));
+
+    boolean overwriteMode = false;
+    int workerPoolSize = 1;
+    String sinkId = "sinkId";
+    UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+    DynamicCommitterMetrics committerMetrics = new 
DynamicCommitterMetrics(metricGroup);
+    DynamicCommitter dynamicCommitter =
+        new DynamicCommitter(
+            CATALOG_EXTENSION.catalog(),
+            Maps.newHashMap(),
+            overwriteMode,
+            workerPoolSize,
+            sinkId,
+            committerMetrics);
+
+    dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2));
+
+    table.refresh();
+    assertThat(table.snapshots()).hasSize(1);
+
+    Snapshot snapshot = Iterables.getFirst(table.snapshots(), null);
+    assertThat(snapshot.operation()).isEqualTo("append");
+  }
+
   @Test
   void testReplacePartitions() throws Exception {
     Table table1 = catalog.loadTable(TableIdentifier.of(TABLE1));

Reply via email to