This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.10.x by this push:
new 16b7dcb092 [1.10.x] cherry pick Flink: Ensure DynamicCommitter
Idempotence in the presence of failures (#14461)
16b7dcb092 is described below
commit 16b7dcb092d5b712815c41b469852db7389c6852
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Oct 31 20:45:50 2025 -0700
[1.10.x] cherry pick Flink: Ensure DynamicCommitter Idempotence in the
presence of failures (#14461)
* Flink: Ensure DynamicCommitter Idempotence in the presence of failures
(#14182)
(cherry picked from commit 3860284b763a3a744e9ebb3b58278c4ba91f1f5d)
* Flink: Backport #14182: Ensure DynamicCommitter Idempotence in the
presence of failures (#14213)
(cherry picked from commit 441597e22ef3ec1ea03fd837cbc1e5dffce899a4)
---------
Co-authored-by: Maximilian Michels <[email protected]>
---
.../flink/sink/dynamic/DynamicCommitter.java | 55 ++-
.../flink/sink/dynamic/TestDynamicCommitter.java | 535 ++++++++++++++++++---
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 92 +---
.../flink/sink/dynamic/DynamicCommitter.java | 55 ++-
.../flink/sink/dynamic/TestDynamicCommitter.java | 535 ++++++++++++++++++---
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 92 +---
.../flink/sink/dynamic/DynamicCommitter.java | 55 ++-
.../flink/sink/dynamic/TestDynamicCommitter.java | 535 ++++++++++++++++++---
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 92 +---
9 files changed, 1485 insertions(+), 561 deletions(-)
diff --git
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index e58066aac6..54d506b663 100644
---
a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++
b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -274,26 +274,25 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
- for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
- // We don't commit the merged result into a single transaction because
for the sequential
- // transaction txn1 and txn2, the equality-delete files of txn2 are
required to be applied
- // to data files from txn1. Committing the merged one will lead to the
incorrect delete
- // semantic.
- for (WriteResult result : e.getValue()) {
- ReplacePartitions dynamicOverwrite =
- table.newReplacePartitions().scanManifestsWith(workerPool);
+ // Iceberg tables are unsorted. So the order of the append data does not
matter.
+ // Hence, we commit everything in one snapshot.
+ ReplacePartitions dynamicOverwrite =
table.newReplacePartitions().scanManifestsWith(workerPool);
+
+ for (List<WriteResult> writeResults : pendingResults.values()) {
+ for (WriteResult result : writeResults) {
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
- commitOperation(
- table,
- branch,
- dynamicOverwrite,
- summary,
- "dynamic partition overwrite",
- newFlinkJobId,
- operatorId,
- e.getKey());
}
}
+
+ commitOperation(
+ table,
+ branch,
+ dynamicOverwrite,
+ summary,
+ "dynamic partition overwrite",
+ newFlinkJobId,
+ operatorId,
+ pendingResults.lastKey());
}
private void commitDeltaTxn(
@@ -304,11 +303,11 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
String newFlinkJobId,
String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
- // We don't commit the merged result into a single transaction because
for the sequential
- // transaction txn1 and txn2, the equality-delete files of txn2 are
required to be applied
- // to data files from txn1. Committing the merged one will lead to the
incorrect delete
- // semantic.
- for (WriteResult result : e.getValue()) {
+ long checkpointId = e.getKey();
+ List<WriteResult> writeResults = e.getValue();
+
+ RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
+ for (WriteResult result : writeResults) {
// Row delta validations are not needed for streaming changes that
write equality deletes.
// Equality deletes are applied to data in all previous sequence
numbers, so retries may
// push deletes further in the future, but do not affect correctness.
Position deletes
@@ -316,13 +315,17 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
// being added in this commit. There is no way for data files added
along with the delete
// files to be concurrently removed, so there is no need to validate
the files referenced by
// the position delete files that are being committed.
- RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
- commitOperation(
- table, branch, rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, e.getKey());
}
+
+ // Every Flink checkpoint contains a set of independent changes which
can be committed
+ // together. While it is technically feasible to combine append-only
data across checkpoints,
+ // for the sake of simplicity, we do not implement this (premature)
optimization. Multiple
+ // pending checkpoints here are very rare to occur, i.e. only with very
short checkpoint
+ // intervals or when concurrent checkpointing is enabled.
+ commitOperation(
+ table, branch, rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, checkpointId);
}
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 99a5465362..f5387aee88 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -19,8 +19,13 @@
package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
@@ -30,19 +35,24 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -74,6 +84,39 @@ class TestDynamicCommitter {
))
.build();
+ private static final DataFile DATA_FILE_2 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-2.parquet")
+ .withFileSizeInBytes(0)
+ .withMetrics(
+ new Metrics(
+ 24L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ null,
+ ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+ ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+ ))
+ .build();
+
+ private static final DeleteFile DELETE_FILE =
+ FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-3.parquet")
+ .withFileSizeInBytes(0)
+ .withMetrics(
+ new Metrics(
+ 24L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ null,
+ ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+ ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+ ))
+ .ofPositionDeletes()
+ .build();
+
@BeforeEach
void before() {
catalog = CATALOG_EXTENSION.catalog();
@@ -162,60 +205,57 @@ class TestDynamicCommitter {
Snapshot first = Iterables.getFirst(table1.snapshots(), null);
assertThat(first.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
Snapshot second = Iterables.get(table1.snapshots(), 1, null);
assertThat(second.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
table2.refresh();
assertThat(table2.snapshots()).hasSize(1);
Snapshot third = Iterables.getFirst(table2.snapshots(), null);
assertThat(third.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
}
@Test
@@ -277,21 +317,276 @@ class TestDynamicCommitter {
Snapshot first = Iterables.getFirst(table1.snapshots(), null);
assertThat(first.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
+ @Test
+ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ WriteTarget writeTarget1 =
+ new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2));
+ // writeTarget2 has a different schema
+ WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true,
Sets.newHashSet());
+ // Different branch for writeTarget3
+ WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true,
Sets.newHashSet());
+
+ WriteResult writeResult1 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+ WriteResult writeResult2 =
WriteResult.builder().addDataFiles(DATA_FILE_2).build();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId1 = 1;
+ final int checkpointId2 = 2;
+
+ byte[] deltaManifest1 =
+ aggregator.writeToManifest(
+ writeTarget1,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget1,
writeResult1)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget1, deltaManifest1, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest2 =
+ aggregator.writeToManifest(
+ writeTarget2,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget2,
writeResult2)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget2, deltaManifest2, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest3 =
+ aggregator.writeToManifest(
+ writeTarget3,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget3,
writeResult2)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest3 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget3, deltaManifest3, jobId,
operatorId, checkpointId2));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2,
commitRequest3));
+
+ table.refresh();
+ // Two committables, one for each snapshot / table / branch.
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+
assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId());
+ assertThat(snapshot1.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "2")
+ .put("added-records", "66")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "2")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "66")
+ .build());
+
+ Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+
assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId());
+ assertThat(snapshot2.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "24")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "24")
+ .build());
+ }
+
+ @Test
+ void testTableBranchAtomicCommitWithFailures() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false,
Sets.newHashSet());
+ // writeTarget2 has a different schema
+ WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false,
Sets.newHashSet());
+ WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false,
Sets.newHashSet());
+
+ WriteResult writeResult1 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+ WriteResult writeResult2 =
WriteResult.builder().addDeleteFiles(DELETE_FILE).build();
+ WriteResult writeResult3 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId1 = 1;
+ final int checkpointId2 = 2;
+
+ byte[] deltaManifest1 =
+ aggregator.writeToManifest(
+ writeTarget1,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget1,
writeResult1)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget1, deltaManifest1, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest2 =
+ aggregator.writeToManifest(
+ writeTarget2,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget2,
writeResult2)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget2, deltaManifest2, jobId,
operatorId, checkpointId2));
+
+ byte[] deltaManifest3 =
+ aggregator.writeToManifest(
+ writeTarget3,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget3,
writeResult3)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest3 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget3, deltaManifest3, jobId,
operatorId, checkpointId2));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+
+ // Use special hook to fail during various states of the commit operation
+ CommitHook commitHook = new FailBeforeAndAfterCommit();
+ DynamicCommitter dynamicCommitter =
+ new CommitHookEnabledDynamicCommitter(
+ commitHook,
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ ThrowingCallable commitExecutable =
+ () ->
+ dynamicCommitter.commit(
+ Sets.newHashSet(commitRequest1, commitRequest2,
commitRequest3));
+
+ // First fail pre-commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+
+ // Second fail during commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+
+ // Third fail after commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
+
+ // Finally commit must go through, although it is a NOOP because the third
failure is directly
+ // after the commit finished.
+ try {
+ commitExecutable.call();
+ } catch (Throwable e) {
+ fail("Should not have thrown an exception");
+ }
+
+ table.refresh();
+ // Three committables, but only two snapshots! WriteResults from different
checkpoints are not
+ // getting
+ // combined due to one writeResult2 containing a delete file.
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+ assertThat(snapshot1.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+
+ Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+ assertThat(snapshot2.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "2")
+ .put("total-delete-files", "1")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "24")
+ .put("total-records", "84")
+ .build());
}
@Test
@@ -361,21 +656,109 @@ class TestDynamicCommitter {
Snapshot latestSnapshot = Iterables.getLast(table1.snapshots());
assertThat(latestSnapshot.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("replace-partitions", "true")
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId + 1))
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("replace-partitions", "true")
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId + 1))
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
+ interface CommitHook extends Serializable {
+ void beforeCommit();
+
+ void duringCommit();
+
+ void afterCommit();
+ }
+
+ static class FailBeforeAndAfterCommit implements CommitHook {
+
+ static boolean failedBeforeCommit;
+ static boolean failedDuringCommit;
+ static boolean failedAfterCommit;
+
+ FailBeforeAndAfterCommit() {
+ reset();
+ }
+
+ @Override
+ public void beforeCommit() {
+ if (!failedBeforeCommit) {
+ failedBeforeCommit = true;
+ throw new RuntimeException("Failing before commit");
+ }
+ }
+
+ @Override
+ public void duringCommit() {
+ if (!failedDuringCommit) {
+ failedDuringCommit = true;
+ throw new RuntimeException("Failing during commit");
+ }
+ }
+
+ @Override
+ public void afterCommit() {
+ if (!failedAfterCommit) {
+ failedAfterCommit = true;
+ throw new RuntimeException("Failing before commit");
+ }
+ }
+
+ static void reset() {
+ failedBeforeCommit = false;
+ failedDuringCommit = false;
+ failedAfterCommit = false;
+ }
+ }
+
+ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
+ private final CommitHook commitHook;
+
+ CommitHookEnabledDynamicCommitter(
+ CommitHook commitHook,
+ Catalog catalog,
+ Map<String, String> snapshotProperties,
+ boolean replacePartitions,
+ int workerPoolSize,
+ String sinkId,
+ DynamicCommitterMetrics committerMetrics) {
+ super(
+ catalog, snapshotProperties, replacePartitions, workerPoolSize,
sinkId, committerMetrics);
+ this.commitHook = commitHook;
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
+ throws IOException, InterruptedException {
+ commitHook.beforeCommit();
+ super.commit(commitRequests);
+ commitHook.afterCommit();
+ }
+
+ @Override
+ void commitOperation(
+ Table table,
+ String branch,
+ SnapshotUpdate<?> operation,
+ CommitSummary summary,
+ String description,
+ String newFlinkJobId,
+ String operatorId,
+ long checkpointId) {
+ super.commitOperation(
+ table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
+ commitHook.duringCommit();
+ }
}
}
diff --git
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index b61e297cc1..20fae212b4 100644
---
a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.fail;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -56,7 +55,6 @@ import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -69,8 +67,9 @@ import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
+import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.CommitHook;
+import
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndAfterCommit;
import org.apache.iceberg.inmemory.InMemoryInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -528,7 +527,8 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
// Configure a Restart strategy to allow recovery
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
2);
+ // Allow max 3 retries to make up for the three failures we are simulating
here
+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
3);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ZERO);
env.configure(configuration);
@@ -539,14 +539,15 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
new DynamicIcebergDataImpl(
SimpleDataUtil.SCHEMA, "t2", "main",
PartitionSpec.unpartitioned()));
- FailBeforeAndAfterCommit.reset();
final CommitHook commitHook = new FailBeforeAndAfterCommit();
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
executeDynamicSink(rows, env, true, 1, commitHook);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
}
@@ -569,44 +570,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
- interface CommitHook extends Serializable {
- void beforeCommit();
-
- void duringCommit();
-
- void afterCommit();
- }
-
- private static class FailBeforeAndAfterCommit implements CommitHook {
-
- static boolean failedBeforeCommit;
- static boolean failedAfterCommit;
-
- @Override
- public void beforeCommit() {
- if (!failedBeforeCommit) {
- failedBeforeCommit = true;
- throw new RuntimeException("Failing before commit");
- }
- }
-
- @Override
- public void duringCommit() {}
-
- @Override
- public void afterCommit() {
- if (!failedAfterCommit) {
- failedAfterCommit = true;
- throw new RuntimeException("Failing before commit");
- }
- }
-
- static void reset() {
- failedBeforeCommit = false;
- failedAfterCommit = false;
- }
- }
-
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;
@@ -734,8 +697,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
@Override
public Committer<DynamicCommittable> createCommitter(CommitterInitContext
context) {
- // return super.createCommitter(context);
- return new CommitHookEnabledDynamicCommitter(
+ return new TestDynamicCommitter.CommitHookEnabledDynamicCommitter(
commitHook,
CATALOG_EXTENSION.catalogLoader().loadCatalog(),
Collections.emptyMap(),
@@ -746,46 +708,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
}
}
- static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
- private final CommitHook commitHook;
-
- CommitHookEnabledDynamicCommitter(
- CommitHook commitHook,
- Catalog catalog,
- Map<String, String> snapshotProperties,
- boolean replacePartitions,
- int workerPoolSize,
- String sinkId,
- DynamicCommitterMetrics committerMetrics) {
- super(
- catalog, snapshotProperties, replacePartitions, workerPoolSize,
sinkId, committerMetrics);
- this.commitHook = commitHook;
- }
-
- @Override
- public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
- throws IOException, InterruptedException {
- commitHook.beforeCommit();
- super.commit(commitRequests);
- commitHook.afterCommit();
- }
-
- @Override
- void commitOperation(
- Table table,
- String branch,
- SnapshotUpdate<?> operation,
- CommitSummary summary,
- String description,
- String newFlinkJobId,
- String operatorId,
- long checkpointId) {
- commitHook.duringCommit();
- super.commitOperation(
- table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
- }
- }
-
private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws
IOException {
// Calculate the expected result
Map<Tuple2<String, String>, List<RowData>> expectedData =
Maps.newHashMap();
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index e58066aac6..54d506b663 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -274,26 +274,25 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
- for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
- // We don't commit the merged result into a single transaction because
for the sequential
- // transaction txn1 and txn2, the equality-delete files of txn2 are
required to be applied
- // to data files from txn1. Committing the merged one will lead to the
incorrect delete
- // semantic.
- for (WriteResult result : e.getValue()) {
- ReplacePartitions dynamicOverwrite =
- table.newReplacePartitions().scanManifestsWith(workerPool);
+ // Iceberg tables are unsorted. So the order of the append data does not
matter.
+ // Hence, we commit everything in one snapshot.
+ ReplacePartitions dynamicOverwrite =
table.newReplacePartitions().scanManifestsWith(workerPool);
+
+ for (List<WriteResult> writeResults : pendingResults.values()) {
+ for (WriteResult result : writeResults) {
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
- commitOperation(
- table,
- branch,
- dynamicOverwrite,
- summary,
- "dynamic partition overwrite",
- newFlinkJobId,
- operatorId,
- e.getKey());
}
}
+
+ commitOperation(
+ table,
+ branch,
+ dynamicOverwrite,
+ summary,
+ "dynamic partition overwrite",
+ newFlinkJobId,
+ operatorId,
+ pendingResults.lastKey());
}
private void commitDeltaTxn(
@@ -304,11 +303,11 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
String newFlinkJobId,
String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
- // We don't commit the merged result into a single transaction because
for the sequential
- // transaction txn1 and txn2, the equality-delete files of txn2 are
required to be applied
- // to data files from txn1. Committing the merged one will lead to the
incorrect delete
- // semantic.
- for (WriteResult result : e.getValue()) {
+ long checkpointId = e.getKey();
+ List<WriteResult> writeResults = e.getValue();
+
+ RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
+ for (WriteResult result : writeResults) {
// Row delta validations are not needed for streaming changes that
write equality deletes.
// Equality deletes are applied to data in all previous sequence
numbers, so retries may
// push deletes further in the future, but do not affect correctness.
Position deletes
@@ -316,13 +315,17 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
// being added in this commit. There is no way for data files added
along with the delete
// files to be concurrently removed, so there is no need to validate
the files referenced by
// the position delete files that are being committed.
- RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
- commitOperation(
- table, branch, rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, e.getKey());
}
+
+ // Every Flink checkpoint contains a set of independent changes which
can be committed
+ // together. While it is technically feasible to combine append-only
data across checkpoints,
+ // for the sake of simplicity, we do not implement this (premature)
optimization. Multiple
+ // pending checkpoints here are very rare to occur, i.e. only with very
short checkpoint
+ // intervals or when concurrent checkpointing is enabled.
+ commitOperation(
+ table, branch, rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, checkpointId);
}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 99a5465362..f5387aee88 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -19,8 +19,13 @@
package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
@@ -30,19 +35,24 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -74,6 +84,39 @@ class TestDynamicCommitter {
))
.build();
+ private static final DataFile DATA_FILE_2 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-2.parquet")
+ .withFileSizeInBytes(0)
+ .withMetrics(
+ new Metrics(
+ 24L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ null,
+ ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+ ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+ ))
+ .build();
+
+ private static final DeleteFile DELETE_FILE =
+ FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-3.parquet")
+ .withFileSizeInBytes(0)
+ .withMetrics(
+ new Metrics(
+ 24L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ null,
+ ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+ ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+ ))
+ .ofPositionDeletes()
+ .build();
+
@BeforeEach
void before() {
catalog = CATALOG_EXTENSION.catalog();
@@ -162,60 +205,57 @@ class TestDynamicCommitter {
Snapshot first = Iterables.getFirst(table1.snapshots(), null);
assertThat(first.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
Snapshot second = Iterables.get(table1.snapshots(), 1, null);
assertThat(second.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
table2.refresh();
assertThat(table2.snapshots()).hasSize(1);
Snapshot third = Iterables.getFirst(table2.snapshots(), null);
assertThat(third.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
}
@Test
@@ -277,21 +317,276 @@ class TestDynamicCommitter {
Snapshot first = Iterables.getFirst(table1.snapshots(), null);
assertThat(first.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
+ @Test
+ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ WriteTarget writeTarget1 =
+ new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2));
+ // writeTarget2 has a different schema
+ WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true,
Sets.newHashSet());
+ // Different branch for writeTarget3
+ WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true,
Sets.newHashSet());
+
+ WriteResult writeResult1 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+ WriteResult writeResult2 =
WriteResult.builder().addDataFiles(DATA_FILE_2).build();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId1 = 1;
+ final int checkpointId2 = 2;
+
+ byte[] deltaManifest1 =
+ aggregator.writeToManifest(
+ writeTarget1,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget1,
writeResult1)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget1, deltaManifest1, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest2 =
+ aggregator.writeToManifest(
+ writeTarget2,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget2,
writeResult2)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget2, deltaManifest2, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest3 =
+ aggregator.writeToManifest(
+ writeTarget3,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget3,
writeResult2)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest3 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget3, deltaManifest3, jobId,
operatorId, checkpointId2));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2,
commitRequest3));
+
+ table.refresh();
+ // Two committables, one for each snapshot / table / branch.
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+
assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId());
+ assertThat(snapshot1.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "2")
+ .put("added-records", "66")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "2")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "66")
+ .build());
+
+ Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+
assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId());
+ assertThat(snapshot2.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "24")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "24")
+ .build());
+ }
+
+ @Test
+ void testTableBranchAtomicCommitWithFailures() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false,
Sets.newHashSet());
+ // writeTarget2 has a different schema
+ WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false,
Sets.newHashSet());
+ WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false,
Sets.newHashSet());
+
+ WriteResult writeResult1 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+ WriteResult writeResult2 =
WriteResult.builder().addDeleteFiles(DELETE_FILE).build();
+ WriteResult writeResult3 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId1 = 1;
+ final int checkpointId2 = 2;
+
+ byte[] deltaManifest1 =
+ aggregator.writeToManifest(
+ writeTarget1,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget1,
writeResult1)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget1, deltaManifest1, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest2 =
+ aggregator.writeToManifest(
+ writeTarget2,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget2,
writeResult2)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget2, deltaManifest2, jobId,
operatorId, checkpointId2));
+
+ byte[] deltaManifest3 =
+ aggregator.writeToManifest(
+ writeTarget3,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget3,
writeResult3)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest3 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget3, deltaManifest3, jobId,
operatorId, checkpointId2));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+
+ // Use special hook to fail during various states of the commit operation
+ CommitHook commitHook = new FailBeforeAndAfterCommit();
+ DynamicCommitter dynamicCommitter =
+ new CommitHookEnabledDynamicCommitter(
+ commitHook,
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ ThrowingCallable commitExecutable =
+ () ->
+ dynamicCommitter.commit(
+ Sets.newHashSet(commitRequest1, commitRequest2,
commitRequest3));
+
+ // First fail pre-commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+
+ // Second fail during commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+
+ // Third fail after commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
+
+ // Finally commit must go through, although it is a NOOP because the third
failure is directly
+ // after the commit finished.
+ try {
+ commitExecutable.call();
+ } catch (Throwable e) {
+ fail("Should not have thrown an exception");
+ }
+
+ table.refresh();
+ // Three committables, but only two snapshots! WriteResults from different
checkpoints are not
+ // getting
+ // combined due to one writeResult2 containing a delete file.
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+ assertThat(snapshot1.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+
+ Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+ assertThat(snapshot2.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "2")
+ .put("total-delete-files", "1")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "24")
+ .put("total-records", "84")
+ .build());
}
@Test
@@ -361,21 +656,109 @@ class TestDynamicCommitter {
Snapshot latestSnapshot = Iterables.getLast(table1.snapshots());
assertThat(latestSnapshot.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("replace-partitions", "true")
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId + 1))
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("replace-partitions", "true")
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId + 1))
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
+ interface CommitHook extends Serializable {
+ void beforeCommit();
+
+ void duringCommit();
+
+ void afterCommit();
+ }
+
+ static class FailBeforeAndAfterCommit implements CommitHook {
+
+ static boolean failedBeforeCommit;
+ static boolean failedDuringCommit;
+ static boolean failedAfterCommit;
+
+ FailBeforeAndAfterCommit() {
+ reset();
+ }
+
+ @Override
+ public void beforeCommit() {
+ if (!failedBeforeCommit) {
+ failedBeforeCommit = true;
+ throw new RuntimeException("Failing before commit");
+ }
+ }
+
+ @Override
+ public void duringCommit() {
+ if (!failedDuringCommit) {
+ failedDuringCommit = true;
+ throw new RuntimeException("Failing during commit");
+ }
+ }
+
+ @Override
+ public void afterCommit() {
+ if (!failedAfterCommit) {
+ failedAfterCommit = true;
+ throw new RuntimeException("Failing before commit");
+ }
+ }
+
+ static void reset() {
+ failedBeforeCommit = false;
+ failedDuringCommit = false;
+ failedAfterCommit = false;
+ }
+ }
+
+ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
+ private final CommitHook commitHook;
+
+ CommitHookEnabledDynamicCommitter(
+ CommitHook commitHook,
+ Catalog catalog,
+ Map<String, String> snapshotProperties,
+ boolean replacePartitions,
+ int workerPoolSize,
+ String sinkId,
+ DynamicCommitterMetrics committerMetrics) {
+ super(
+ catalog, snapshotProperties, replacePartitions, workerPoolSize,
sinkId, committerMetrics);
+ this.commitHook = commitHook;
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
+ throws IOException, InterruptedException {
+ commitHook.beforeCommit();
+ super.commit(commitRequests);
+ commitHook.afterCommit();
+ }
+
+ @Override
+ void commitOperation(
+ Table table,
+ String branch,
+ SnapshotUpdate<?> operation,
+ CommitSummary summary,
+ String description,
+ String newFlinkJobId,
+ String operatorId,
+ long checkpointId) {
+ super.commitOperation(
+ table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
+ commitHook.duringCommit();
+ }
}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index b61e297cc1..20fae212b4 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.fail;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -56,7 +55,6 @@ import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -69,8 +67,9 @@ import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
+import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.CommitHook;
+import
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndAfterCommit;
import org.apache.iceberg.inmemory.InMemoryInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -528,7 +527,8 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
// Configure a Restart strategy to allow recovery
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
2);
+ // Allow max 3 retries to make up for the three failures we are simulating
here
+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
3);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ZERO);
env.configure(configuration);
@@ -539,14 +539,15 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
new DynamicIcebergDataImpl(
SimpleDataUtil.SCHEMA, "t2", "main",
PartitionSpec.unpartitioned()));
- FailBeforeAndAfterCommit.reset();
final CommitHook commitHook = new FailBeforeAndAfterCommit();
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
executeDynamicSink(rows, env, true, 1, commitHook);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
}
@@ -569,44 +570,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
- interface CommitHook extends Serializable {
- void beforeCommit();
-
- void duringCommit();
-
- void afterCommit();
- }
-
- private static class FailBeforeAndAfterCommit implements CommitHook {
-
- static boolean failedBeforeCommit;
- static boolean failedAfterCommit;
-
- @Override
- public void beforeCommit() {
- if (!failedBeforeCommit) {
- failedBeforeCommit = true;
- throw new RuntimeException("Failing before commit");
- }
- }
-
- @Override
- public void duringCommit() {}
-
- @Override
- public void afterCommit() {
- if (!failedAfterCommit) {
- failedAfterCommit = true;
- throw new RuntimeException("Failing before commit");
- }
- }
-
- static void reset() {
- failedBeforeCommit = false;
- failedAfterCommit = false;
- }
- }
-
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;
@@ -734,8 +697,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
@Override
public Committer<DynamicCommittable> createCommitter(CommitterInitContext
context) {
- // return super.createCommitter(context);
- return new CommitHookEnabledDynamicCommitter(
+ return new TestDynamicCommitter.CommitHookEnabledDynamicCommitter(
commitHook,
CATALOG_EXTENSION.catalogLoader().loadCatalog(),
Collections.emptyMap(),
@@ -746,46 +708,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
}
}
- static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
- private final CommitHook commitHook;
-
- CommitHookEnabledDynamicCommitter(
- CommitHook commitHook,
- Catalog catalog,
- Map<String, String> snapshotProperties,
- boolean replacePartitions,
- int workerPoolSize,
- String sinkId,
- DynamicCommitterMetrics committerMetrics) {
- super(
- catalog, snapshotProperties, replacePartitions, workerPoolSize,
sinkId, committerMetrics);
- this.commitHook = commitHook;
- }
-
- @Override
- public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
- throws IOException, InterruptedException {
- commitHook.beforeCommit();
- super.commit(commitRequests);
- commitHook.afterCommit();
- }
-
- @Override
- void commitOperation(
- Table table,
- String branch,
- SnapshotUpdate<?> operation,
- CommitSummary summary,
- String description,
- String newFlinkJobId,
- String operatorId,
- long checkpointId) {
- commitHook.duringCommit();
- super.commitOperation(
- table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
- }
- }
-
private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws
IOException {
// Calculate the expected result
Map<Tuple2<String, String>, List<RowData>> expectedData =
Maps.newHashMap();
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index e58066aac6..54d506b663 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -274,26 +274,25 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
- for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
- // We don't commit the merged result into a single transaction because
for the sequential
- // transaction txn1 and txn2, the equality-delete files of txn2 are
required to be applied
- // to data files from txn1. Committing the merged one will lead to the
incorrect delete
- // semantic.
- for (WriteResult result : e.getValue()) {
- ReplacePartitions dynamicOverwrite =
- table.newReplacePartitions().scanManifestsWith(workerPool);
+ // Iceberg tables are unsorted. So the order of the append data does not
matter.
+ // Hence, we commit everything in one snapshot.
+ ReplacePartitions dynamicOverwrite =
table.newReplacePartitions().scanManifestsWith(workerPool);
+
+ for (List<WriteResult> writeResults : pendingResults.values()) {
+ for (WriteResult result : writeResults) {
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
- commitOperation(
- table,
- branch,
- dynamicOverwrite,
- summary,
- "dynamic partition overwrite",
- newFlinkJobId,
- operatorId,
- e.getKey());
}
}
+
+ commitOperation(
+ table,
+ branch,
+ dynamicOverwrite,
+ summary,
+ "dynamic partition overwrite",
+ newFlinkJobId,
+ operatorId,
+ pendingResults.lastKey());
}
private void commitDeltaTxn(
@@ -304,11 +303,11 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
String newFlinkJobId,
String operatorId) {
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
- // We don't commit the merged result into a single transaction because
for the sequential
- // transaction txn1 and txn2, the equality-delete files of txn2 are
required to be applied
- // to data files from txn1. Committing the merged one will lead to the
incorrect delete
- // semantic.
- for (WriteResult result : e.getValue()) {
+ long checkpointId = e.getKey();
+ List<WriteResult> writeResults = e.getValue();
+
+ RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
+ for (WriteResult result : writeResults) {
// Row delta validations are not needed for streaming changes that
write equality deletes.
// Equality deletes are applied to data in all previous sequence
numbers, so retries may
// push deletes further in the future, but do not affect correctness.
Position deletes
@@ -316,13 +315,17 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
// being added in this commit. There is no way for data files added
along with the delete
// files to be concurrently removed, so there is no need to validate
the files referenced by
// the position delete files that are being committed.
- RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
-
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
- commitOperation(
- table, branch, rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, e.getKey());
}
+
+ // Every Flink checkpoint contains a set of independent changes which
can be committed
+ // together. While it is technically feasible to combine append-only
data across checkpoints,
+ // for the sake of simplicity, we do not implement this (premature)
optimization. Multiple
+ // pending checkpoints here are very rare to occur, i.e. only with very
short checkpoint
+ // intervals or when concurrent checkpointing is enabled.
+ commitOperation(
+ table, branch, rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, checkpointId);
}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 99a5465362..f5387aee88 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -19,8 +19,13 @@
package org.apache.iceberg.flink.sink.dynamic;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
@@ -30,19 +35,24 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.HadoopCatalogExtension;
+import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -74,6 +84,39 @@ class TestDynamicCommitter {
))
.build();
+ private static final DataFile DATA_FILE_2 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-2.parquet")
+ .withFileSizeInBytes(0)
+ .withMetrics(
+ new Metrics(
+ 24L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ null,
+ ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+ ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+ ))
+ .build();
+
+ private static final DeleteFile DELETE_FILE =
+ FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/data-3.parquet")
+ .withFileSizeInBytes(0)
+ .withMetrics(
+ new Metrics(
+ 24L,
+ null, // no column sizes
+ ImmutableMap.of(1, 3L), // value count
+ ImmutableMap.of(1, 0L), // null count
+ null,
+ ImmutableMap.of(1, ByteBuffer.allocate(1)), // lower bounds
+ ImmutableMap.of(1, ByteBuffer.allocate(1)) // upper bounds
+ ))
+ .ofPositionDeletes()
+ .build();
+
@BeforeEach
void before() {
catalog = CATALOG_EXTENSION.catalog();
@@ -162,60 +205,57 @@ class TestDynamicCommitter {
Snapshot first = Iterables.getFirst(table1.snapshots(), null);
assertThat(first.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
Snapshot second = Iterables.get(table1.snapshots(), 1, null);
assertThat(second.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
table2.refresh();
assertThat(table2.snapshots()).hasSize(1);
Snapshot third = Iterables.getFirst(table2.snapshots(), null);
assertThat(third.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
}
@Test
@@ -277,21 +317,276 @@ class TestDynamicCommitter {
Snapshot first = Iterables.getFirst(table1.snapshots(), null);
assertThat(first.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id", "" +
checkpointId)
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
+ @Test
+ void testTableBranchAtomicCommitForAppendOnlyData() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ WriteTarget writeTarget1 =
+ new WriteTarget(TABLE1, "branch1", 42, 0, true, Sets.newHashSet(1, 2));
+ // writeTarget2 has a different schema
+ WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch1", 23, 0, true,
Sets.newHashSet());
+ // Different branch for writeTarget3
+ WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch2", 23, 0, true,
Sets.newHashSet());
+
+ WriteResult writeResult1 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+ WriteResult writeResult2 =
WriteResult.builder().addDataFiles(DATA_FILE_2).build();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId1 = 1;
+ final int checkpointId2 = 2;
+
+ byte[] deltaManifest1 =
+ aggregator.writeToManifest(
+ writeTarget1,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget1,
writeResult1)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget1, deltaManifest1, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest2 =
+ aggregator.writeToManifest(
+ writeTarget2,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget2,
writeResult2)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget2, deltaManifest2, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest3 =
+ aggregator.writeToManifest(
+ writeTarget3,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget3,
writeResult2)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest3 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget3, deltaManifest3, jobId,
operatorId, checkpointId2));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+ DynamicCommitter dynamicCommitter =
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ dynamicCommitter.commit(Sets.newHashSet(commitRequest1, commitRequest2,
commitRequest3));
+
+ table.refresh();
+ // Two committables, one for each snapshot / table / branch.
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+
assertThat(snapshot1.snapshotId()).isEqualTo(table.refs().get("branch1").snapshotId());
+ assertThat(snapshot1.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "2")
+ .put("added-records", "66")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "2")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "66")
+ .build());
+
+ Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+
assertThat(snapshot2.snapshotId()).isEqualTo(table.refs().get("branch2").snapshotId());
+ assertThat(snapshot2.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "24")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "24")
+ .build());
+ }
+
+ @Test
+ void testTableBranchAtomicCommitWithFailures() throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader());
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ WriteTarget writeTarget1 = new WriteTarget(TABLE1, "branch", 42, 0, false,
Sets.newHashSet());
+ // writeTarget2 has a different schema
+ WriteTarget writeTarget2 = new WriteTarget(TABLE1, "branch", 23, 0, false,
Sets.newHashSet());
+ WriteTarget writeTarget3 = new WriteTarget(TABLE1, "branch", 23, 0, false,
Sets.newHashSet());
+
+ WriteResult writeResult1 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+ WriteResult writeResult2 =
WriteResult.builder().addDeleteFiles(DELETE_FILE).build();
+ WriteResult writeResult3 =
WriteResult.builder().addDataFiles(DATA_FILE).build();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId1 = 1;
+ final int checkpointId2 = 2;
+
+ byte[] deltaManifest1 =
+ aggregator.writeToManifest(
+ writeTarget1,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget1,
writeResult1)),
+ checkpointId1);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget1, deltaManifest1, jobId,
operatorId, checkpointId1));
+
+ byte[] deltaManifest2 =
+ aggregator.writeToManifest(
+ writeTarget2,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget2,
writeResult2)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest2 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget2, deltaManifest2, jobId,
operatorId, checkpointId2));
+
+ byte[] deltaManifest3 =
+ aggregator.writeToManifest(
+ writeTarget3,
+ Sets.newHashSet(new DynamicWriteResult(writeTarget3,
writeResult3)),
+ checkpointId2);
+
+ CommitRequest<DynamicCommittable> commitRequest3 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget3, deltaManifest3, jobId,
operatorId, checkpointId2));
+
+ boolean overwriteMode = false;
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+
+ // Use special hook to fail during various states of the commit operation
+ CommitHook commitHook = new FailBeforeAndAfterCommit();
+ DynamicCommitter dynamicCommitter =
+ new CommitHookEnabledDynamicCommitter(
+ commitHook,
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ ThrowingCallable commitExecutable =
+ () ->
+ dynamicCommitter.commit(
+ Sets.newHashSet(commitRequest1, commitRequest2,
commitRequest3));
+
+ // First fail pre-commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+
+ // Second fail during commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+
+ // Third fail after commit
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
+
+ // Finally commit must go through, although it is a NOOP because the third
failure is directly
+ // after the commit finished.
+ try {
+ commitExecutable.call();
+ } catch (Throwable e) {
+ fail("Should not have thrown an exception");
+ }
+
+ table.refresh();
+ // Three committables, but only two snapshots! WriteResults from different
checkpoints are not
+ // getting
+ // combined due to one writeResult2 containing a delete file.
+ assertThat(table.snapshots()).hasSize(2);
+
+ Snapshot snapshot1 = Iterables.getFirst(table.snapshots(), null);
+ assertThat(snapshot1.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId1)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+
+ Snapshot snapshot2 = Iterables.get(table.snapshots(), 1);
+ assertThat(snapshot2.summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id", "" + checkpointId2)
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "2")
+ .put("total-delete-files", "1")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "24")
+ .put("total-records", "84")
+ .build());
}
@Test
@@ -361,21 +656,109 @@ class TestDynamicCommitter {
Snapshot latestSnapshot = Iterables.getLast(table1.snapshots());
assertThat(latestSnapshot.summary())
.containsAllEntriesOf(
- (Map)
- ImmutableMap.builder()
- .put("replace-partitions", "true")
- .put("added-data-files", "1")
- .put("added-records", "42")
- .put("changed-partition-count", "1")
- .put("flink.job-id", jobId)
- .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId + 1))
- .put("flink.operator-id", operatorId)
- .put("total-data-files", "1")
- .put("total-delete-files", "0")
- .put("total-equality-deletes", "0")
- .put("total-files-size", "0")
- .put("total-position-deletes", "0")
- .put("total-records", "42")
- .build());
+ ImmutableMap.<String, String>builder()
+ .put("replace-partitions", "true")
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId + 1))
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
+ interface CommitHook extends Serializable {
+ void beforeCommit();
+
+ void duringCommit();
+
+ void afterCommit();
+ }
+
+ static class FailBeforeAndAfterCommit implements CommitHook {
+
+ static boolean failedBeforeCommit;
+ static boolean failedDuringCommit;
+ static boolean failedAfterCommit;
+
+ FailBeforeAndAfterCommit() {
+ reset();
+ }
+
+ @Override
+ public void beforeCommit() {
+ if (!failedBeforeCommit) {
+ failedBeforeCommit = true;
+ throw new RuntimeException("Failing before commit");
+ }
+ }
+
+ @Override
+ public void duringCommit() {
+ if (!failedDuringCommit) {
+ failedDuringCommit = true;
+ throw new RuntimeException("Failing during commit");
+ }
+ }
+
+ @Override
+ public void afterCommit() {
+ if (!failedAfterCommit) {
+ failedAfterCommit = true;
+ throw new RuntimeException("Failing before commit");
+ }
+ }
+
+ static void reset() {
+ failedBeforeCommit = false;
+ failedDuringCommit = false;
+ failedAfterCommit = false;
+ }
+ }
+
+ static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
+ private final CommitHook commitHook;
+
+ CommitHookEnabledDynamicCommitter(
+ CommitHook commitHook,
+ Catalog catalog,
+ Map<String, String> snapshotProperties,
+ boolean replacePartitions,
+ int workerPoolSize,
+ String sinkId,
+ DynamicCommitterMetrics committerMetrics) {
+ super(
+ catalog, snapshotProperties, replacePartitions, workerPoolSize,
sinkId, committerMetrics);
+ this.commitHook = commitHook;
+ }
+
+ @Override
+ public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
+ throws IOException, InterruptedException {
+ commitHook.beforeCommit();
+ super.commit(commitRequests);
+ commitHook.afterCommit();
+ }
+
+ @Override
+ void commitOperation(
+ Table table,
+ String branch,
+ SnapshotUpdate<?> operation,
+ CommitSummary summary,
+ String description,
+ String newFlinkJobId,
+ String operatorId,
+ long checkpointId) {
+ super.commitOperation(
+ table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
+ commitHook.duringCommit();
+ }
}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index b61e297cc1..20fae212b4 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.fail;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -56,7 +55,6 @@ import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -69,8 +67,9 @@ import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
+import org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.CommitHook;
+import
org.apache.iceberg.flink.sink.dynamic.TestDynamicCommitter.FailBeforeAndAfterCommit;
import org.apache.iceberg.inmemory.InMemoryInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -528,7 +527,8 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
// Configure a Restart strategy to allow recovery
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
-
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
2);
+ // Allow max 3 retries to make up for the three failures we are simulating
here
+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
3);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ZERO);
env.configure(configuration);
@@ -539,14 +539,15 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
new DynamicIcebergDataImpl(
SimpleDataUtil.SCHEMA, "t2", "main",
PartitionSpec.unpartitioned()));
- FailBeforeAndAfterCommit.reset();
final CommitHook commitHook = new FailBeforeAndAfterCommit();
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
executeDynamicSink(rows, env, true, 1, commitHook);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
}
@@ -569,44 +570,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
- interface CommitHook extends Serializable {
- void beforeCommit();
-
- void duringCommit();
-
- void afterCommit();
- }
-
- private static class FailBeforeAndAfterCommit implements CommitHook {
-
- static boolean failedBeforeCommit;
- static boolean failedAfterCommit;
-
- @Override
- public void beforeCommit() {
- if (!failedBeforeCommit) {
- failedBeforeCommit = true;
- throw new RuntimeException("Failing before commit");
- }
- }
-
- @Override
- public void duringCommit() {}
-
- @Override
- public void afterCommit() {
- if (!failedAfterCommit) {
- failedAfterCommit = true;
- throw new RuntimeException("Failing before commit");
- }
- }
-
- static void reset() {
- failedBeforeCommit = false;
- failedAfterCommit = false;
- }
- }
-
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;
@@ -734,8 +697,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
@Override
public Committer<DynamicCommittable> createCommitter(CommitterInitContext
context) {
- // return super.createCommitter(context);
- return new CommitHookEnabledDynamicCommitter(
+ return new TestDynamicCommitter.CommitHookEnabledDynamicCommitter(
commitHook,
CATALOG_EXTENSION.catalogLoader().loadCatalog(),
Collections.emptyMap(),
@@ -746,46 +708,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
}
}
- static class CommitHookEnabledDynamicCommitter extends DynamicCommitter {
- private final CommitHook commitHook;
-
- CommitHookEnabledDynamicCommitter(
- CommitHook commitHook,
- Catalog catalog,
- Map<String, String> snapshotProperties,
- boolean replacePartitions,
- int workerPoolSize,
- String sinkId,
- DynamicCommitterMetrics committerMetrics) {
- super(
- catalog, snapshotProperties, replacePartitions, workerPoolSize,
sinkId, committerMetrics);
- this.commitHook = commitHook;
- }
-
- @Override
- public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
- throws IOException, InterruptedException {
- commitHook.beforeCommit();
- super.commit(commitRequests);
- commitHook.afterCommit();
- }
-
- @Override
- void commitOperation(
- Table table,
- String branch,
- SnapshotUpdate<?> operation,
- CommitSummary summary,
- String description,
- String newFlinkJobId,
- String operatorId,
- long checkpointId) {
- commitHook.duringCommit();
- super.commitOperation(
- table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
- }
- }
-
private void verifyResults(List<DynamicIcebergDataImpl> dynamicData) throws
IOException {
// Calculate the expected result
Map<Tuple2<String, String>, List<RowData>> expectedData =
Maps.newHashMap();