This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 11274f65dc Flink: Backport fix commit duplication in
DynamicIcebergSink (#14637)
11274f65dc is described below
commit 11274f65dc5972a9d260b3c64e7026ab97fd08f0
Author: aiborodin <[email protected]>
AuthorDate: Thu Nov 20 20:00:39 2025 +1100
Flink: Backport fix commit duplication in DynamicIcebergSink (#14637)
---
.../flink/sink/dynamic/DynamicCommitter.java | 69 +++++++++--
.../flink/sink/dynamic/TestDynamicCommitter.java | 130 ++++++++++++++++++---
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 126 +++++++++++++++++---
.../flink/sink/dynamic/DynamicCommitter.java | 69 +++++++++--
.../flink/sink/dynamic/TestDynamicCommitter.java | 130 ++++++++++++++++++---
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 126 +++++++++++++++++---
6 files changed, 574 insertions(+), 76 deletions(-)
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index d5774b66af..61b20cb27b 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -37,12 +37,14 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotAncestryValidator;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
@@ -55,6 +57,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,9 +144,13 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
commitRequestMap.entrySet()) {
Table table =
catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
DynamicCommittable last =
entry.getValue().lastEntry().getValue().get(0).getCommittable();
+ Snapshot latestSnapshot = table.snapshot(entry.getKey().branch());
+ Iterable<Snapshot> ancestors =
+ latestSnapshot != null
+ ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(),
table::snapshot)
+ : List.of();
long maxCommittedCheckpointId =
- getMaxCommittedCheckpointId(
- table, last.jobId(), last.operatorId(), entry.getKey().branch());
+ getMaxCommittedCheckpointId(ancestors, last.jobId(),
last.operatorId());
// Mark the already committed FilesCommittable(s) as finished
entry
.getValue()
@@ -160,12 +167,11 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
}
private static long getMaxCommittedCheckpointId(
- Table table, String flinkJobId, String operatorId, String branch) {
- Snapshot snapshot = table.snapshot(branch);
+ Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
- while (snapshot != null) {
- Map<String, String> summary = snapshot.summary();
+ for (Snapshot ancestor : ancestors) {
+ Map<String, String> summary = ancestor.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
String snapshotOperatorId = summary.get(OPERATOR_ID);
if (flinkJobId.equals(snapshotFlinkJobId)
@@ -176,9 +182,6 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
break;
}
}
-
- Long parentSnapshotId = snapshot.parentId();
- snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) :
null;
}
return lastCommittedCheckpointId;
@@ -347,6 +350,36 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
}
}
+ private static class MaxCommittedCheckpointMismatchException extends
ValidationException {
+ private MaxCommittedCheckpointMismatchException() {
+ super("Table already contains staged changes.");
+ }
+ }
+
+ private static class MaxCommittedCheckpointIdValidator implements
SnapshotAncestryValidator {
+ private final long stagedCheckpointId;
+ private final String flinkJobId;
+ private final String flinkOperatorId;
+
+ private MaxCommittedCheckpointIdValidator(
+ long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
+ this.stagedCheckpointId = stagedCheckpointId;
+ this.flinkJobId = flinkJobId;
+ this.flinkOperatorId = flinkOperatorId;
+ }
+
+ @Override
+ public Boolean apply(Iterable<Snapshot> baseSnapshots) {
+ long maxCommittedCheckpointId =
+ getMaxCommittedCheckpointId(baseSnapshots, flinkJobId,
flinkOperatorId);
+ if (maxCommittedCheckpointId >= stagedCheckpointId) {
+ throw new MaxCommittedCheckpointMismatchException();
+ }
+
+ return true;
+ }
+ }
+
@VisibleForTesting
void commitOperation(
Table table,
@@ -372,9 +405,25 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
operation.toBranch(branch);
+ operation.validateWith(
+ new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId,
operatorId));
long startNano = System.nanoTime();
- operation.commit(); // abort is automatically called if this fails.
+ try {
+ operation.commit(); // abort is automatically called if this fails.
+ } catch (MaxCommittedCheckpointMismatchException e) {
+ LOG.info(
+ "Skipping commit operation {} because the {} branch of the {} table
already contains changes for checkpoint {}."
+ + " This can occur when a failure prevents the committer from
receiving confirmation of a"
+ + " successful commit, causing the Flink job to retry committing
the same set of changes.",
+ description,
+ branch,
+ table.name(),
+ checkpointId,
+ e);
+ return;
+ }
+
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano);
LOG.info(
"Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index d2c688e28c..1497458e60 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -59,6 +60,8 @@ import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
class TestDynamicCommitter {
@@ -669,11 +672,15 @@ class TestDynamicCommitter {
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
- // Second fail during commit
+ // Second fail before table update
assertThatThrownBy(commitExecutable);
- assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
- // Third fail after commit
+ // Third fail after table update
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
+
+ // Fourth fail after commit
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
@@ -812,18 +819,101 @@ class TestDynamicCommitter {
.build());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode)
throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 1;
+ final String branch = SnapshotRef.MAIN_BRANCH;
+
+ WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false,
Sets.newHashSet(1, 2));
+ byte[] manifest =
+ aggregator.writeToManifest(
+ writeTarget,
+ Sets.newHashSet(
+ new DynamicWriteResult(
+ writeTarget,
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+ checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget, manifest, jobId, operatorId,
checkpointId));
+ Collection<CommitRequest<DynamicCommittable>> commitRequests =
Sets.newHashSet(commitRequest1);
+
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+
+ CommitHook commitHook =
+ new TestDynamicIcebergSink.DuplicateCommitHook(
+ () ->
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Map.of(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics));
+
+ DynamicCommitter mainCommitter =
+ new CommitHookEnabledDynamicCommitter(
+ commitHook,
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ mainCommitter.commit(commitRequests);
+
+ // Only one commit should succeed
+ table.refresh();
+ assertThat(table.snapshots()).hasSize(1);
+ assertThat(table.currentSnapshot().summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId))
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
interface CommitHook extends Serializable {
- void beforeCommit();
+ default void beforeCommit(Collection<CommitRequest<DynamicCommittable>>
commitRequests) {}
- void duringCommit();
+ default void beforeCommitOperation() {}
- void afterCommit();
+ default void afterCommitOperation() {}
+
+ default void afterCommit() {}
}
static class FailBeforeAndAfterCommit implements CommitHook {
static boolean failedBeforeCommit;
- static boolean failedDuringCommit;
+ static boolean failedBeforeCommitOperation;
+ static boolean failedAfterCommitOperation;
static boolean failedAfterCommit;
FailBeforeAndAfterCommit() {
@@ -831,7 +921,7 @@ class TestDynamicCommitter {
}
@Override
- public void beforeCommit() {
+ public void beforeCommit(Collection<CommitRequest<DynamicCommittable>>
ignored) {
if (!failedBeforeCommit) {
failedBeforeCommit = true;
throw new RuntimeException("Failing before commit");
@@ -839,10 +929,18 @@ class TestDynamicCommitter {
}
@Override
- public void duringCommit() {
- if (!failedDuringCommit) {
- failedDuringCommit = true;
- throw new RuntimeException("Failing during commit");
+ public void beforeCommitOperation() {
+ if (!failedBeforeCommitOperation) {
+ failedBeforeCommitOperation = true;
+ throw new RuntimeException("Failing before commit operation");
+ }
+ }
+
+ @Override
+ public void afterCommitOperation() {
+ if (!failedAfterCommitOperation) {
+ failedAfterCommitOperation = true;
+ throw new RuntimeException("Failing after commit operation");
}
}
@@ -856,7 +954,8 @@ class TestDynamicCommitter {
static void reset() {
failedBeforeCommit = false;
- failedDuringCommit = false;
+ failedBeforeCommitOperation = false;
+ failedAfterCommitOperation = false;
failedAfterCommit = false;
}
}
@@ -880,7 +979,7 @@ class TestDynamicCommitter {
@Override
public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
throws IOException, InterruptedException {
- commitHook.beforeCommit();
+ commitHook.beforeCommit(commitRequests);
super.commit(commitRequests);
commitHook.afterCommit();
}
@@ -895,9 +994,10 @@ class TestDynamicCommitter {
String newFlinkJobId,
String operatorId,
long checkpointId) {
+ commitHook.beforeCommitOperation();
super.commitOperation(
table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
- commitHook.duringCommit();
+ commitHook.afterCommitOperation();
}
}
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index f0cc46df46..b660d8e285 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.fail;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,7 @@ import
org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,6 +51,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.SerializableSupplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DistributionMode;
@@ -81,6 +84,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase {
@@ -732,8 +737,8 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
// Configure a Restart strategy to allow recovery
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
- // Allow max 3 retries to make up for the three failures we are simulating
here
-
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
3);
+ // Allow max 4 retries to make up for the four failures we are simulating
here
+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
4);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ZERO);
env.configure(configuration);
@@ -746,13 +751,15 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
final CommitHook commitHook = new FailBeforeAndAfterCommit();
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
- assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse();
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
executeDynamicSink(rows, env, true, 1, commitHook);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
- assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
}
@@ -775,6 +782,90 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode)
throws Exception {
+ TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1");
+ List<DynamicIcebergDataImpl> records =
+ Lists.newArrayList(
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA, tableId.name(), "main",
PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA, tableId.name(), "main",
PartitionSpec.unpartitioned()));
+
+ CommitHook duplicateCommit =
+ new DuplicateCommitHook(
+ () ->
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalogLoader().loadCatalog(),
+ Collections.emptyMap(),
+ overwriteMode,
+ 10,
+ "sinkId",
+ new DynamicCommitterMetrics(new
UnregisteredMetricsGroup())));
+
+ executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode);
+
+ Table table = CATALOG_EXTENSION.catalog().loadTable(tableId);
+
+ if (!overwriteMode) {
+ verifyResults(records);
+ assertThat(table.currentSnapshot().summary())
+ .containsAllEntriesOf(Map.of("total-records",
String.valueOf(records.size())));
+ }
+
+ long totalAddedRecords =
+ Lists.newArrayList(table.snapshots()).stream()
+ .map(snapshot -> snapshot.summary().getOrDefault("added-records",
"0"))
+ .mapToLong(Long::valueOf)
+ .sum();
+ assertThat(totalAddedRecords).isEqualTo(records.size());
+ }
+
+ /**
+ * Represents a concurrent duplicate commit during an ongoing commit
operation, which can happen
+ * in production scenarios when using REST catalog.
+ */
+ static class DuplicateCommitHook implements CommitHook {
+ // Static to maintain state after Flink restarts
+ private static boolean hasTriggered = false;
+
+ private final SerializableSupplier<DynamicCommitter>
duplicateCommitterSupplier;
+ private final List<Committer.CommitRequest<DynamicCommittable>>
commitRequests;
+
+ DuplicateCommitHook(SerializableSupplier<DynamicCommitter>
duplicateCommitterSupplier) {
+ this.duplicateCommitterSupplier = duplicateCommitterSupplier;
+ this.commitRequests = Lists.newArrayList();
+
+ resetState();
+ }
+
+ private static void resetState() {
+ hasTriggered = false;
+ }
+
+ @Override
+ public void
beforeCommit(Collection<Committer.CommitRequest<DynamicCommittable>> requests) {
+ if (!hasTriggered) {
+ this.commitRequests.addAll(requests);
+ }
+ }
+
+ @Override
+ public void beforeCommitOperation() {
+ if (!hasTriggered) {
+ try {
+ duplicateCommitterSupplier.get().commit(commitRequests);
+ } catch (final IOException | InterruptedException e) {
+ throw new RuntimeException("Duplicate committer failed", e);
+ }
+
+ commitRequests.clear();
+ hasTriggered = true;
+ }
+ }
+ }
+
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;
@@ -784,10 +875,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
}
@Override
- public void beforeCommit() {}
-
- @Override
- public void duringCommit() {
+ public void beforeCommitOperation() {
// Create a conflict
Table table =
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier));
DataFile dataFile =
@@ -798,9 +886,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
.build();
table.newAppend().appendFile(dataFile).commit();
}
-
- @Override
- public void afterCommit() {}
}
private void runTest(List<DynamicIcebergDataImpl> dynamicData) throws
Exception {
@@ -831,8 +916,19 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
int parallelism,
@Nullable CommitHook commitHook)
throws Exception {
+ executeDynamicSink(dynamicData, env, immediateUpdate, parallelism,
commitHook, false);
+ }
+
+ private void executeDynamicSink(
+ List<DynamicIcebergDataImpl> dynamicData,
+ StreamExecutionEnvironment env,
+ boolean immediateUpdate,
+ int parallelism,
+ @Nullable CommitHook commitHook,
+ boolean overwrite)
+ throws Exception {
DataStream<DynamicIcebergDataImpl> dataStream =
- env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new
TypeHint<>() {}));
+ env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {}));
env.setParallelism(parallelism);
if (commitHook != null) {
@@ -843,6 +939,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
.writeParallelism(parallelism)
.immediateTableUpdate(immediateUpdate)
.setSnapshotProperty("commit.retry.num-retries", "0")
+ .overwrite(overwrite)
.append();
} else {
DynamicIcebergSink.forInput(dataStream)
@@ -850,6 +947,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
.catalogLoader(CATALOG_EXTENSION.catalogLoader())
.writeParallelism(parallelism)
.immediateTableUpdate(immediateUpdate)
+ .overwrite(overwrite)
.append();
}
@@ -881,6 +979,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
static class CommitHookDynamicIcebergSink extends DynamicIcebergSink {
private final CommitHook commitHook;
+ private final boolean overwriteMode;
CommitHookDynamicIcebergSink(
CommitHook commitHook,
@@ -898,6 +997,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
flinkWriteConf,
cacheMaximumSize);
this.commitHook = commitHook;
+ this.overwriteMode = flinkWriteConf.overwriteMode();
}
@Override
@@ -906,7 +1006,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
commitHook,
CATALOG_EXTENSION.catalogLoader().loadCatalog(),
Collections.emptyMap(),
- false,
+ overwriteMode,
10,
"sinkId",
new DynamicCommitterMetrics(context.metricGroup()));
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
index d5774b66af..61b20cb27b 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
@@ -37,12 +37,14 @@ import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotAncestryValidator;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
@@ -55,6 +57,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,9 +144,13 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
commitRequestMap.entrySet()) {
Table table =
catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName()));
DynamicCommittable last =
entry.getValue().lastEntry().getValue().get(0).getCommittable();
+ Snapshot latestSnapshot = table.snapshot(entry.getKey().branch());
+ Iterable<Snapshot> ancestors =
+ latestSnapshot != null
+ ? SnapshotUtil.ancestorsOf(latestSnapshot.snapshotId(),
table::snapshot)
+ : List.of();
long maxCommittedCheckpointId =
- getMaxCommittedCheckpointId(
- table, last.jobId(), last.operatorId(), entry.getKey().branch());
+ getMaxCommittedCheckpointId(ancestors, last.jobId(),
last.operatorId());
// Mark the already committed FilesCommittable(s) as finished
entry
.getValue()
@@ -160,12 +167,11 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
}
private static long getMaxCommittedCheckpointId(
- Table table, String flinkJobId, String operatorId, String branch) {
- Snapshot snapshot = table.snapshot(branch);
+ Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
- while (snapshot != null) {
- Map<String, String> summary = snapshot.summary();
+ for (Snapshot ancestor : ancestors) {
+ Map<String, String> summary = ancestor.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
String snapshotOperatorId = summary.get(OPERATOR_ID);
if (flinkJobId.equals(snapshotFlinkJobId)
@@ -176,9 +182,6 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
break;
}
}
-
- Long parentSnapshotId = snapshot.parentId();
- snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) :
null;
}
return lastCommittedCheckpointId;
@@ -347,6 +350,36 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
}
}
+ private static class MaxCommittedCheckpointMismatchException extends
ValidationException {
+ private MaxCommittedCheckpointMismatchException() {
+ super("Table already contains staged changes.");
+ }
+ }
+
+ private static class MaxCommittedCheckpointIdValidator implements
SnapshotAncestryValidator {
+ private final long stagedCheckpointId;
+ private final String flinkJobId;
+ private final String flinkOperatorId;
+
+ private MaxCommittedCheckpointIdValidator(
+ long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
+ this.stagedCheckpointId = stagedCheckpointId;
+ this.flinkJobId = flinkJobId;
+ this.flinkOperatorId = flinkOperatorId;
+ }
+
+ @Override
+ public Boolean apply(Iterable<Snapshot> baseSnapshots) {
+ long maxCommittedCheckpointId =
+ getMaxCommittedCheckpointId(baseSnapshots, flinkJobId,
flinkOperatorId);
+ if (maxCommittedCheckpointId >= stagedCheckpointId) {
+ throw new MaxCommittedCheckpointMismatchException();
+ }
+
+ return true;
+ }
+ }
+
@VisibleForTesting
void commitOperation(
Table table,
@@ -372,9 +405,25 @@ class DynamicCommitter implements
Committer<DynamicCommittable> {
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
operation.toBranch(branch);
+ operation.validateWith(
+ new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId,
operatorId));
long startNano = System.nanoTime();
- operation.commit(); // abort is automatically called if this fails.
+ try {
+ operation.commit(); // abort is automatically called if this fails.
+ } catch (MaxCommittedCheckpointMismatchException e) {
+ LOG.info(
+ "Skipping commit operation {} because the {} branch of the {} table
already contains changes for checkpoint {}."
+ + " This can occur when a failure prevents the committer from
receiving confirmation of a"
+ + " successful commit, causing the Flink job to retry committing
the same set of changes.",
+ description,
+ branch,
+ table.name(),
+ checkpointId,
+ e);
+ return;
+ }
+
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano);
LOG.info(
"Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
index 16c0cb8e6c..5f938d4e88 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicCommitter.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -59,6 +60,8 @@ import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
class TestDynamicCommitter {
@@ -669,11 +672,15 @@ class TestDynamicCommitter {
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
- // Second fail during commit
+ // Second fail before table update
assertThatThrownBy(commitExecutable);
- assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
- // Third fail after commit
+ // Third fail after table update
+ assertThatThrownBy(commitExecutable);
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
+
+ // Fourth fail after commit
assertThatThrownBy(commitExecutable);
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
@@ -877,18 +884,101 @@ class TestDynamicCommitter {
.build());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testThrowsValidationExceptionOnDuplicateCommit(boolean overwriteMode)
throws Exception {
+ Table table = catalog.loadTable(TableIdentifier.of(TABLE1));
+ assertThat(table.snapshots()).isEmpty();
+
+ DynamicWriteResultAggregator aggregator =
+ new DynamicWriteResultAggregator(CATALOG_EXTENSION.catalogLoader(),
cacheMaximumSize);
+ OneInputStreamOperatorTestHarness aggregatorHarness =
+ new OneInputStreamOperatorTestHarness(aggregator);
+ aggregatorHarness.open();
+
+ final String jobId = JobID.generate().toHexString();
+ final String operatorId = new OperatorID().toHexString();
+ final int checkpointId = 1;
+ final String branch = SnapshotRef.MAIN_BRANCH;
+
+ WriteTarget writeTarget = new WriteTarget(TABLE1, branch, 42, 0, false,
Sets.newHashSet(1, 2));
+ byte[] manifest =
+ aggregator.writeToManifest(
+ writeTarget,
+ Sets.newHashSet(
+ new DynamicWriteResult(
+ writeTarget,
WriteResult.builder().addDataFiles(DATA_FILE).build())),
+ checkpointId);
+
+ CommitRequest<DynamicCommittable> commitRequest1 =
+ new MockCommitRequest<>(
+ new DynamicCommittable(writeTarget, manifest, jobId, operatorId,
checkpointId));
+ Collection<CommitRequest<DynamicCommittable>> commitRequests =
Sets.newHashSet(commitRequest1);
+
+ int workerPoolSize = 1;
+ String sinkId = "sinkId";
+ UnregisteredMetricsGroup metricGroup = new UnregisteredMetricsGroup();
+ DynamicCommitterMetrics committerMetrics = new
DynamicCommitterMetrics(metricGroup);
+
+ CommitHook commitHook =
+ new TestDynamicIcebergSink.DuplicateCommitHook(
+ () ->
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalog(),
+ Map.of(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics));
+
+ DynamicCommitter mainCommitter =
+ new CommitHookEnabledDynamicCommitter(
+ commitHook,
+ CATALOG_EXTENSION.catalog(),
+ Maps.newHashMap(),
+ overwriteMode,
+ workerPoolSize,
+ sinkId,
+ committerMetrics);
+
+ mainCommitter.commit(commitRequests);
+
+ // Only one commit should succeed
+ table.refresh();
+ assertThat(table.snapshots()).hasSize(1);
+ assertThat(table.currentSnapshot().summary())
+ .containsAllEntriesOf(
+ ImmutableMap.<String, String>builder()
+ .put("added-data-files", "1")
+ .put("added-records", "42")
+ .put("changed-partition-count", "1")
+ .put("flink.job-id", jobId)
+ .put("flink.max-committed-checkpoint-id",
String.valueOf(checkpointId))
+ .put("flink.operator-id", operatorId)
+ .put("total-data-files", "1")
+ .put("total-delete-files", "0")
+ .put("total-equality-deletes", "0")
+ .put("total-files-size", "0")
+ .put("total-position-deletes", "0")
+ .put("total-records", "42")
+ .build());
+ }
+
interface CommitHook extends Serializable {
- void beforeCommit();
+ default void beforeCommit(Collection<CommitRequest<DynamicCommittable>>
commitRequests) {}
- void duringCommit();
+ default void beforeCommitOperation() {}
- void afterCommit();
+ default void afterCommitOperation() {}
+
+ default void afterCommit() {}
}
static class FailBeforeAndAfterCommit implements CommitHook {
static boolean failedBeforeCommit;
- static boolean failedDuringCommit;
+ static boolean failedBeforeCommitOperation;
+ static boolean failedAfterCommitOperation;
static boolean failedAfterCommit;
FailBeforeAndAfterCommit() {
@@ -896,7 +986,7 @@ class TestDynamicCommitter {
}
@Override
- public void beforeCommit() {
+ public void beforeCommit(Collection<CommitRequest<DynamicCommittable>>
ignored) {
if (!failedBeforeCommit) {
failedBeforeCommit = true;
throw new RuntimeException("Failing before commit");
@@ -904,10 +994,18 @@ class TestDynamicCommitter {
}
@Override
- public void duringCommit() {
- if (!failedDuringCommit) {
- failedDuringCommit = true;
- throw new RuntimeException("Failing during commit");
+ public void beforeCommitOperation() {
+ if (!failedBeforeCommitOperation) {
+ failedBeforeCommitOperation = true;
+ throw new RuntimeException("Failing before commit operation");
+ }
+ }
+
+ @Override
+ public void afterCommitOperation() {
+ if (!failedAfterCommitOperation) {
+ failedAfterCommitOperation = true;
+ throw new RuntimeException("Failing after commit operation");
}
}
@@ -921,7 +1019,8 @@ class TestDynamicCommitter {
static void reset() {
failedBeforeCommit = false;
- failedDuringCommit = false;
+ failedBeforeCommitOperation = false;
+ failedAfterCommitOperation = false;
failedAfterCommit = false;
}
}
@@ -945,7 +1044,7 @@ class TestDynamicCommitter {
@Override
public void commit(Collection<CommitRequest<DynamicCommittable>>
commitRequests)
throws IOException, InterruptedException {
- commitHook.beforeCommit();
+ commitHook.beforeCommit(commitRequests);
super.commit(commitRequests);
commitHook.afterCommit();
}
@@ -960,9 +1059,10 @@ class TestDynamicCommitter {
String newFlinkJobId,
String operatorId,
long checkpointId) {
+ commitHook.beforeCommitOperation();
super.commitOperation(
table, branch, operation, summary, description, newFlinkJobId,
operatorId, checkpointId);
- commitHook.duringCommit();
+ commitHook.afterCommitOperation();
}
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index f0cc46df46..b660d8e285 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.fail;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,7 @@ import
org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -49,6 +51,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.SerializableSupplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DistributionMode;
@@ -81,6 +84,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
class TestDynamicIcebergSink extends TestFlinkIcebergSinkBase {
@@ -732,8 +737,8 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
// Configure a Restart strategy to allow recovery
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
- // Allow max 3 retries to make up for the three failures we are simulating
here
-
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
3);
+ // Allow max 4 retries to make up for the four failures we are simulating
here
+
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
4);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ZERO);
env.configure(configuration);
@@ -746,13 +751,15 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
final CommitHook commitHook = new FailBeforeAndAfterCommit();
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isFalse();
- assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isFalse();
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isFalse();
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isFalse();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isFalse();
executeDynamicSink(rows, env, true, 1, commitHook);
assertThat(FailBeforeAndAfterCommit.failedBeforeCommit).isTrue();
- assertThat(FailBeforeAndAfterCommit.failedDuringCommit).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedBeforeCommitOperation).isTrue();
+ assertThat(FailBeforeAndAfterCommit.failedAfterCommitOperation).isTrue();
assertThat(FailBeforeAndAfterCommit.failedAfterCommit).isTrue();
}
@@ -775,6 +782,90 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
executeDynamicSink(rows, env, true, 1, commitHook);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode)
throws Exception {
+ TableIdentifier tableId = TableIdentifier.of(DATABASE, "t1");
+ List<DynamicIcebergDataImpl> records =
+ Lists.newArrayList(
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA, tableId.name(), "main",
PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ SimpleDataUtil.SCHEMA, tableId.name(), "main",
PartitionSpec.unpartitioned()));
+
+ CommitHook duplicateCommit =
+ new DuplicateCommitHook(
+ () ->
+ new DynamicCommitter(
+ CATALOG_EXTENSION.catalogLoader().loadCatalog(),
+ Collections.emptyMap(),
+ overwriteMode,
+ 10,
+ "sinkId",
+ new DynamicCommitterMetrics(new
UnregisteredMetricsGroup())));
+
+ executeDynamicSink(records, env, true, 2, duplicateCommit, overwriteMode);
+
+ Table table = CATALOG_EXTENSION.catalog().loadTable(tableId);
+
+ if (!overwriteMode) {
+ verifyResults(records);
+ assertThat(table.currentSnapshot().summary())
+ .containsAllEntriesOf(Map.of("total-records",
String.valueOf(records.size())));
+ }
+
+ long totalAddedRecords =
+ Lists.newArrayList(table.snapshots()).stream()
+ .map(snapshot -> snapshot.summary().getOrDefault("added-records",
"0"))
+ .mapToLong(Long::valueOf)
+ .sum();
+ assertThat(totalAddedRecords).isEqualTo(records.size());
+ }
+
+ /**
+ * Represents a concurrent duplicate commit during an ongoing commit
operation, which can happen
+ * in production scenarios when using REST catalog.
+ */
+ static class DuplicateCommitHook implements CommitHook {
+ // Static to maintain state after Flink restarts
+ private static boolean hasTriggered = false;
+
+ private final SerializableSupplier<DynamicCommitter>
duplicateCommitterSupplier;
+ private final List<Committer.CommitRequest<DynamicCommittable>>
commitRequests;
+
+ DuplicateCommitHook(SerializableSupplier<DynamicCommitter>
duplicateCommitterSupplier) {
+ this.duplicateCommitterSupplier = duplicateCommitterSupplier;
+ this.commitRequests = Lists.newArrayList();
+
+ resetState();
+ }
+
+ private static void resetState() {
+ hasTriggered = false;
+ }
+
+ @Override
+ public void
beforeCommit(Collection<Committer.CommitRequest<DynamicCommittable>> requests) {
+ if (!hasTriggered) {
+ this.commitRequests.addAll(requests);
+ }
+ }
+
+ @Override
+ public void beforeCommitOperation() {
+ if (!hasTriggered) {
+ try {
+ duplicateCommitterSupplier.get().commit(commitRequests);
+ } catch (final IOException | InterruptedException e) {
+ throw new RuntimeException("Duplicate committer failed", e);
+ }
+
+ commitRequests.clear();
+ hasTriggered = true;
+ }
+ }
+ }
+
private static class AppendRightBeforeCommit implements CommitHook {
final String tableIdentifier;
@@ -784,10 +875,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
}
@Override
- public void beforeCommit() {}
-
- @Override
- public void duringCommit() {
+ public void beforeCommitOperation() {
// Create a conflict
Table table =
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.parse(tableIdentifier));
DataFile dataFile =
@@ -798,9 +886,6 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
.build();
table.newAppend().appendFile(dataFile).commit();
}
-
- @Override
- public void afterCommit() {}
}
private void runTest(List<DynamicIcebergDataImpl> dynamicData) throws
Exception {
@@ -831,8 +916,19 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
int parallelism,
@Nullable CommitHook commitHook)
throws Exception {
+ executeDynamicSink(dynamicData, env, immediateUpdate, parallelism,
commitHook, false);
+ }
+
+ private void executeDynamicSink(
+ List<DynamicIcebergDataImpl> dynamicData,
+ StreamExecutionEnvironment env,
+ boolean immediateUpdate,
+ int parallelism,
+ @Nullable CommitHook commitHook,
+ boolean overwrite)
+ throws Exception {
DataStream<DynamicIcebergDataImpl> dataStream =
- env.addSource(createBoundedSource(dynamicData), TypeInformation.of(new
TypeHint<>() {}));
+ env.fromData(dynamicData, TypeInformation.of(new TypeHint<>() {}));
env.setParallelism(parallelism);
if (commitHook != null) {
@@ -843,6 +939,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
.writeParallelism(parallelism)
.immediateTableUpdate(immediateUpdate)
.setSnapshotProperty("commit.retry.num-retries", "0")
+ .overwrite(overwrite)
.append();
} else {
DynamicIcebergSink.forInput(dataStream)
@@ -850,6 +947,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
.catalogLoader(CATALOG_EXTENSION.catalogLoader())
.writeParallelism(parallelism)
.immediateTableUpdate(immediateUpdate)
+ .overwrite(overwrite)
.append();
}
@@ -881,6 +979,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
static class CommitHookDynamicIcebergSink extends DynamicIcebergSink {
private final CommitHook commitHook;
+ private final boolean overwriteMode;
CommitHookDynamicIcebergSink(
CommitHook commitHook,
@@ -898,6 +997,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
flinkWriteConf,
cacheMaximumSize);
this.commitHook = commitHook;
+ this.overwriteMode = flinkWriteConf.overwriteMode();
}
@Override
@@ -906,7 +1006,7 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
commitHook,
CATALOG_EXTENSION.catalogLoader().loadCatalog(),
Collections.emptyMap(),
- false,
+ overwriteMode,
10,
"sinkId",
new DynamicCommitterMetrics(context.metricGroup()));