This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0c09e9fee1 Flink: Backport: Enable multiple flink sinks for the same
table in the same job (#6536)
0c09e9fee1 is described below
commit 0c09e9fee15b5621086fc03e60a4deaa29d83c01
Author: pvary <[email protected]>
AuthorDate: Fri Jan 6 21:20:14 2023 +0100
Flink: Backport: Enable multiple flink sinks for the same table in the same
job (#6536)
---
.../iceberg/flink/sink/IcebergFilesCommitter.java | 47 +++--
.../flink/sink/TestIcebergFilesCommitter.java | 225 ++++++++++++++++-----
.../iceberg/flink/sink/IcebergFilesCommitter.java | 47 +++--
.../flink/sink/TestIcebergFilesCommitter.java | 225 ++++++++++++++++-----
4 files changed, 422 insertions(+), 122 deletions(-)
diff --git
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 9c5bbf89e3..b686a76c98 100644
---
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -69,6 +69,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
private static final Logger LOG =
LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String FLINK_JOB_ID = "flink.job-id";
+ private static final String OPERATOR_ID = "flink.operator-id";
// The max checkpoint id we've committed to iceberg table. As the flink's
checkpoint is always
// increasing, so we could correctly commit all the data files whose
checkpoint id is greater than
@@ -97,6 +98,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// It will have an unique identifier for one job.
private transient String flinkJobId;
+ private transient String operatorUniqueId;
private transient Table table;
private transient IcebergFilesCommitterMetrics committerMetrics;
private transient ManifestOutputFileFactory manifestOutputFileFactory;
@@ -134,6 +136,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
this.flinkJobId =
getContainingTask().getEnvironment().getJobID().toString();
+ this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
// Open the table loader and load the table.
this.tableLoader.open();
@@ -147,7 +150,6 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
- String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
this.manifestOutputFileFactory =
FlinkManifestUtil.createOutputFileFactory(
table, flinkJobId, operatorUniqueId, subTaskId, attemptId);
@@ -176,7 +178,8 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// flink job even if it's restored from a snapshot created by another
different flink job, so
// it's safe to assign the max committed checkpoint id from restored
flink job to the current
// flink job.
- this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table,
restoredFlinkJobId);
+ this.maxCommittedCheckpointId =
+ getMaxCommittedCheckpointId(table, restoredFlinkJobId,
operatorUniqueId);
NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -184,7 +187,8 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to
iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
- commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId,
maxUncommittedCheckpointId);
+ commitUpToCheckpoint(
+ uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId,
maxUncommittedCheckpointId);
}
}
}
@@ -226,7 +230,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be
increasing.
if (checkpointId > maxCommittedCheckpointId) {
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId,
operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
LOG.info(
@@ -237,7 +241,10 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
}
private void commitUpToCheckpoint(
- NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long
checkpointId)
+ NavigableMap<Long, byte[]> deltaManifestsMap,
+ String newFlinkJobId,
+ String operatorId,
+ long checkpointId)
throws IOException {
NavigableMap<Long, byte[]> pendingMap =
deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
@@ -257,7 +264,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
}
CommitSummary summary = new CommitSummary(pendingResults);
- commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId);
+ commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId,
checkpointId);
committerMetrics.updateCommitSummary(summary);
pendingMap.clear();
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
@@ -267,14 +274,15 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints
+ 1 : 0;
if (totalFiles != 0 || continuousEmptyCheckpoints %
maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
- replacePartitions(pendingResults, summary, newFlinkJobId,
checkpointId);
+ replacePartitions(pendingResults, summary, newFlinkJobId, operatorId,
checkpointId);
} else {
- commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
+ commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId,
checkpointId);
}
continuousEmptyCheckpoints = 0;
}
@@ -305,6 +313,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
Preconditions.checkState(
summary.deleteFilesCount() == 0, "Cannot overwrite partitions with
delete files.");
@@ -317,13 +326,19 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
}
commitOperation(
- dynamicOverwrite, summary, "dynamic partition overwrite",
newFlinkJobId, checkpointId);
+ dynamicOverwrite,
+ summary,
+ "dynamic partition overwrite",
+ newFlinkJobId,
+ operatorId,
+ checkpointId);
}
private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
@@ -334,7 +349,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
"Should have no referenced data files for append.");
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
- commitOperation(appendFiles, summary, "append", newFlinkJobId,
checkpointId);
+ commitOperation(appendFiles, summary, "append", newFlinkJobId,
operatorId, checkpointId);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
@@ -355,7 +370,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
- commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId,
e.getKey());
+ commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, e.getKey());
}
}
}
@@ -365,6 +380,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
CommitSummary summary,
String description,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
LOG.info(
"Committing {} for checkpoint {} to table {} with summary: {}",
@@ -377,6 +393,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// used by the sink.
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
+ operation.set(OPERATOR_ID, operatorId);
long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
@@ -402,7 +419,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
dataFilesPerCheckpoint.put(currentCheckpointId,
writeToManifest(currentCheckpointId));
writeResultsOfCurrentCkpt.clear();
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId,
currentCheckpointId);
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId,
currentCheckpointId);
}
/**
@@ -454,14 +471,16 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
return new ListStateDescriptor<>("iceberg-files-committer-state",
sortedMapTypeInfo);
}
- static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+ static long getMaxCommittedCheckpointId(Table table, String flinkJobId,
String operatorId) {
Snapshot snapshot = table.currentSnapshot();
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
- if (flinkJobId.equals(snapshotFlinkJobId)) {
+ String snapshotOperatorId = summary.get(OPERATOR_ID);
+ if (flinkJobId.equals(snapshotFlinkJobId)
+ && (snapshotOperatorId == null ||
snapshotOperatorId.equals(operatorId))) {
String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
if (value != null) {
lastCommittedCheckpointId = Long.parseLong(value);
diff --git
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index ca643ad532..c4f93f0ec2 100644
---
a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++
b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -121,13 +122,15 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long checkpointId = 0;
long timestamp = 0;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// It's better to advance the max-committed-checkpoint-id in iceberg
snapshot, so that the
// future flink job
@@ -140,7 +143,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
}
}
@@ -183,9 +186,12 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long timestamp = 0;
JobID jobID = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobID)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
+
assertSnapshotSize(0);
List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
@@ -203,7 +209,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(jobID, i);
+ assertMaxCommittedCheckpointId(jobID, operatorId, i);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
table.currentSnapshot().summary().get("flink.test"));
@@ -221,17 +227,19 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long timestamp = 0;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createRowData(1, "hello");
DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
long firstCheckpointId = 1;
@@ -241,7 +249,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
RowData row2 = SimpleDataUtil.createRowData(2, "world");
DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
harness.processElement(of(dataFile2), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 2. snapshotState for checkpoint#2
long secondCheckpointId = 2;
@@ -251,13 +259,13 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
- assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
assertFlinkManifests(1);
// 4. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
- assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
}
@@ -272,17 +280,19 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long timestamp = 0;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createRowData(1, "hello");
DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
long firstCheckpointId = 1;
@@ -292,7 +302,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
RowData row2 = SimpleDataUtil.createRowData(2, "world");
DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
harness.processElement(of(dataFile2), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 2. snapshotState for checkpoint#2
long secondCheckpointId = 2;
@@ -302,13 +312,13 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
- assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
// 4. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
- assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
}
@@ -321,12 +331,14 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
OperatorSubtaskState snapshot;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row = SimpleDataUtil.createRowData(1, "hello");
expectedRows.add(row);
@@ -341,18 +353,19 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
assertSnapshotSize(1);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
// Restore from the given snapshot
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
+ harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.initializeState(snapshot);
harness.open();
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(1);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
RowData row = SimpleDataUtil.createRowData(2, "world");
expectedRows.add(row);
@@ -367,7 +380,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(2);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
}
@@ -381,12 +394,14 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
OperatorSubtaskState snapshot;
List<RowData> expectedRows = Lists.newArrayList();
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row = SimpleDataUtil.createRowData(1, "hello");
expectedRows.add(row);
@@ -395,11 +410,12 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
snapshot = harness.snapshot(++checkpointId, ++timestamp);
SimpleDataUtil.assertTableRows(table, ImmutableList.of());
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
assertFlinkManifests(1);
}
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
+ harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.initializeState(snapshot);
harness.open();
@@ -409,7 +425,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, expectedRows);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
harness.snapshot(++checkpointId, ++timestamp);
// Did not write any new record, so it won't generate new manifest.
@@ -420,7 +436,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(2);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
RowData row = SimpleDataUtil.createRowData(2, "world");
expectedRows.add(row);
@@ -438,13 +454,14 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
harness.setup();
harness.initializeState(snapshot);
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
// All flink manifests should be cleaned because it has committed the
unfinished iceberg
// transaction.
assertFlinkManifests(0);
- assertMaxCommittedCheckpointId(newJobId, -1);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(3);
@@ -461,7 +478,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(4);
- assertMaxCommittedCheckpointId(newJobId, checkpointId);
+ assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
}
}
@@ -473,13 +490,15 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
List<RowData> tableRows = Lists.newArrayList();
JobID oldJobId = new JobID();
+ OperatorID oldOperatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(oldJobId)) {
harness.setup();
harness.open();
+ oldOperatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(oldJobId, -1L);
+ assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, -1L);
for (int i = 1; i <= 3; i++) {
rows.add(SimpleDataUtil.createRowData(i, "hello" + i));
@@ -495,7 +514,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(oldJobId, checkpointId);
+ assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
}
}
@@ -503,14 +522,16 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
checkpointId = 0;
timestamp = 0;
JobID newJobId = new JobID();
+ OperatorID newOperatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(newJobId)) {
harness.setup();
harness.open();
+ newOperatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(3);
- assertMaxCommittedCheckpointId(oldJobId, 3);
- assertMaxCommittedCheckpointId(newJobId, -1);
+ assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, 3);
+ assertMaxCommittedCheckpointId(newJobId, newOperatorId, -1);
rows.add(SimpleDataUtil.createRowData(2, "world"));
tableRows.addAll(rows);
@@ -524,7 +545,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(4);
- assertMaxCommittedCheckpointId(newJobId, checkpointId);
+ assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
}
}
@@ -534,16 +555,20 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
List<RowData> tableRows = Lists.newArrayList();
JobID[] jobs = new JobID[] {new JobID(), new JobID(), new JobID()};
+ OperatorID[] operatorIds =
+ new OperatorID[] {new OperatorID(), new OperatorID(), new
OperatorID()};
for (int i = 0; i < 20; i++) {
int jobIndex = i % 3;
int checkpointId = i / 3;
JobID jobId = jobs[jobIndex];
+ OperatorID operatorId = operatorIds[jobIndex];
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
+ harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.open();
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(jobId, checkpointId == 0 ? -1 :
checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId == 0 ?
-1 : checkpointId);
List<RowData> rows =
Lists.newArrayList(SimpleDataUtil.createRowData(i, "word-" + i));
tableRows.addAll(rows);
@@ -557,21 +582,121 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(i + 1);
- assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
}
}
}
+ @Test
+ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception {
+ long checkpointId = 0;
+ long timestamp = 0;
+ List<RowData> expectedRows = Lists.newArrayList();
+ OperatorSubtaskState snapshot1;
+ OperatorSubtaskState snapshot2;
+
+ JobID jobId = new JobID();
+ OperatorID operatorId1 = new OperatorID();
+ OperatorID operatorId2 = new OperatorID();
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 =
createStreamSink(jobId);
+ OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 =
createStreamSink(jobId)) {
+ harness1.getStreamConfig().setOperatorID(operatorId1);
+ harness1.setup();
+ harness1.open();
+ harness2.getStreamConfig().setOperatorID(operatorId2);
+ harness2.setup();
+ harness2.open();
+
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, -1L);
+
+ RowData row1 = SimpleDataUtil.createRowData(1, "hello1");
+ expectedRows.add(row1);
+ DataFile dataFile1 = writeDataFile("data-1-1", ImmutableList.of(row1));
+
+ harness1.processElement(of(dataFile1), ++timestamp);
+ snapshot1 = harness1.snapshot(++checkpointId, ++timestamp);
+
+ RowData row2 = SimpleDataUtil.createRowData(1, "hello2");
+ expectedRows.add(row2);
+ DataFile dataFile2 = writeDataFile("data-1-2", ImmutableList.of(row2));
+
+ harness2.processElement(of(dataFile2), ++timestamp);
+ snapshot2 = harness2.snapshot(checkpointId, ++timestamp);
+ assertFlinkManifests(2);
+
+ // Only notify one of the committers
+ harness1.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(1);
+
+ // Only the first row is committed at this point
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ assertSnapshotSize(1);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
+ }
+
+ // Restore from the given snapshot
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 =
createStreamSink(jobId);
+ OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 =
createStreamSink(jobId)) {
+ harness1.getStreamConfig().setOperatorID(operatorId1);
+ harness1.setup();
+ harness1.initializeState(snapshot1);
+ harness1.open();
+
+ harness2.getStreamConfig().setOperatorID(operatorId2);
+ harness2.setup();
+ harness2.initializeState(snapshot2);
+ harness2.open();
+
+ // All flink manifests should be cleaned because it has committed the
unfinished iceberg
+ // transaction.
+ assertFlinkManifests(0);
+
+ SimpleDataUtil.assertTableRows(table, expectedRows);
+ assertSnapshotSize(2);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+
+ RowData row1 = SimpleDataUtil.createRowData(2, "world1");
+ expectedRows.add(row1);
+ DataFile dataFile1 = writeDataFile("data-2-1", ImmutableList.of(row1));
+
+ harness1.processElement(of(dataFile1), ++timestamp);
+ harness1.snapshot(++checkpointId, ++timestamp);
+
+ RowData row2 = SimpleDataUtil.createRowData(2, "world2");
+ expectedRows.add(row2);
+ DataFile dataFile2 = writeDataFile("data-2-2", ImmutableList.of(row2));
+ harness2.processElement(of(dataFile2), ++timestamp);
+ harness2.snapshot(checkpointId, ++timestamp);
+
+ assertFlinkManifests(2);
+
+ harness1.notifyOfCompletedCheckpoint(checkpointId);
+ harness2.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
+ SimpleDataUtil.assertTableRows(table, expectedRows);
+ assertSnapshotSize(4);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+ }
+ }
+
@Test
public void testBoundedStream() throws Exception {
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
assertFlinkManifests(0);
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
List<RowData> tableRows =
Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1"));
@@ -582,7 +707,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(1);
- assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
+ assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
table.currentSnapshot().summary().get("flink.test"));
@@ -595,23 +720,24 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
final long checkpoint = 10;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createRowData(1, "hello");
DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
- String operatorId =
harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals(
"File name should have the expected pattern.",
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0,
checkpoint, 1),
@@ -626,7 +752,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
}
@@ -639,24 +765,25 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long checkpoint = 10;
JobID jobId = new JobID();
+ OperatorID operatorId;
FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createInsert(1, "aaa");
DataFile dataFile1 = writeDataFile("data-file-1",
ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
- String operatorId =
harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals(
"File name should have the expected pattern.",
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0,
checkpoint, 1),
@@ -671,7 +798,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
// 4. process both data files and delete files.
@@ -684,7 +811,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
harness.processElement(
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build(),
++timestamp);
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
// 5. snapshotState for checkpoint#2
harness.snapshot(++checkpoint, ++timestamp);
@@ -693,7 +820,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 6. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
}
@@ -706,13 +833,15 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long checkpoint = 10;
JobID jobId = new JobID();
+ OperatorID operatorId;
FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
@@ -742,7 +871,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// Notify the 2nd snapshot to complete.
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1,
insert4));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
Assert.assertEquals(
"Should have committed 2 txn.", 2,
ImmutableList.copyOf(table.snapshots()).size());
@@ -817,9 +946,11 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
table, table.schema(), table.spec(), CONF, tablePath,
format.addExtension(filename), rows);
}
- private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+ private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID
operatorID, long expectedId) {
table.refresh();
- long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId(table,
jobID.toString());
+ long actualId =
+ IcebergFilesCommitter.getMaxCommittedCheckpointId(
+ table, jobID.toString(), operatorID.toHexString());
Assert.assertEquals(expectedId, actualId);
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 93dc0049d1..d84e2cb706 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -69,6 +69,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
private static final Logger LOG =
LoggerFactory.getLogger(IcebergFilesCommitter.class);
private static final String FLINK_JOB_ID = "flink.job-id";
+ private static final String OPERATOR_ID = "flink.operator-id";
// The max checkpoint id we've committed to iceberg table. As the flink's
checkpoint is always
// increasing, so we could correctly commit all the data files whose
checkpoint id is greater than
@@ -97,6 +98,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// It will have an unique identifier for one job.
private transient String flinkJobId;
+ private transient String operatorUniqueId;
private transient Table table;
private transient IcebergFilesCommitterMetrics committerMetrics;
private transient ManifestOutputFileFactory manifestOutputFileFactory;
@@ -134,6 +136,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
this.flinkJobId =
getContainingTask().getEnvironment().getJobID().toString();
+ this.operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
// Open the table loader and load the table.
this.tableLoader.open();
@@ -147,7 +150,6 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
- String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
this.manifestOutputFileFactory =
FlinkManifestUtil.createOutputFileFactory(
table, flinkJobId, operatorUniqueId, subTaskId, attemptId);
@@ -176,7 +178,8 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// flink job even if it's restored from a snapshot created by another
different flink job, so
// it's safe to assign the max committed checkpoint id from restored
flink job to the current
// flink job.
- this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table,
restoredFlinkJobId);
+ this.maxCommittedCheckpointId =
+ getMaxCommittedCheckpointId(table, restoredFlinkJobId,
operatorUniqueId);
NavigableMap<Long, byte[]> uncommittedDataFiles =
Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -184,7 +187,8 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
if (!uncommittedDataFiles.isEmpty()) {
// Committed all uncommitted data files from the old flink job to
iceberg table.
long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();
- commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId,
maxUncommittedCheckpointId);
+ commitUpToCheckpoint(
+ uncommittedDataFiles, restoredFlinkJobId, operatorUniqueId,
maxUncommittedCheckpointId);
}
}
}
@@ -226,7 +230,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be
increasing.
if (checkpointId > maxCommittedCheckpointId) {
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId,
operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
LOG.info(
@@ -237,7 +241,10 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
}
private void commitUpToCheckpoint(
- NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long
checkpointId)
+ NavigableMap<Long, byte[]> deltaManifestsMap,
+ String newFlinkJobId,
+ String operatorId,
+ long checkpointId)
throws IOException {
NavigableMap<Long, byte[]> pendingMap =
deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
@@ -257,7 +264,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
}
CommitSummary summary = new CommitSummary(pendingResults);
- commitPendingResult(pendingResults, summary, newFlinkJobId, checkpointId);
+ commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId,
checkpointId);
committerMetrics.updateCommitSummary(summary);
pendingMap.clear();
deleteCommittedManifests(manifests, newFlinkJobId, checkpointId);
@@ -267,14 +274,15 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
continuousEmptyCheckpoints = totalFiles == 0 ? continuousEmptyCheckpoints
+ 1 : 0;
if (totalFiles != 0 || continuousEmptyCheckpoints %
maxContinuousEmptyCommits == 0) {
if (replacePartitions) {
- replacePartitions(pendingResults, summary, newFlinkJobId,
checkpointId);
+ replacePartitions(pendingResults, summary, newFlinkJobId, operatorId,
checkpointId);
} else {
- commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
+ commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId,
checkpointId);
}
continuousEmptyCheckpoints = 0;
} else {
@@ -307,6 +315,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
Preconditions.checkState(
summary.deleteFilesCount() == 0, "Cannot overwrite partitions with
delete files.");
@@ -319,13 +328,19 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
}
commitOperation(
- dynamicOverwrite, summary, "dynamic partition overwrite",
newFlinkJobId, checkpointId);
+ dynamicOverwrite,
+ summary,
+ "dynamic partition overwrite",
+ newFlinkJobId,
+ operatorId,
+ checkpointId);
}
private void commitDeltaTxn(
NavigableMap<Long, WriteResult> pendingResults,
CommitSummary summary,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
if (summary.deleteFilesCount() == 0) {
// To be compatible with iceberg format V1.
@@ -336,7 +351,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
"Should have no referenced data files for append.");
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
- commitOperation(appendFiles, summary, "append", newFlinkJobId,
checkpointId);
+ commitOperation(appendFiles, summary, "append", newFlinkJobId,
operatorId, checkpointId);
} else {
// To be compatible with iceberg format V2.
for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
@@ -357,7 +372,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
- commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId,
e.getKey());
+ commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId,
operatorId, e.getKey());
}
}
}
@@ -367,6 +382,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
CommitSummary summary,
String description,
String newFlinkJobId,
+ String operatorId,
long checkpointId) {
LOG.info(
"Committing {} for checkpoint {} to table {} with summary: {}",
@@ -379,6 +395,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
// used by the sink.
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
+ operation.set(OPERATOR_ID, operatorId);
long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
@@ -404,7 +421,7 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
dataFilesPerCheckpoint.put(currentCheckpointId,
writeToManifest(currentCheckpointId));
writeResultsOfCurrentCkpt.clear();
- commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId,
currentCheckpointId);
+ commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId,
currentCheckpointId);
}
/**
@@ -456,14 +473,16 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
return new ListStateDescriptor<>("iceberg-files-committer-state",
sortedMapTypeInfo);
}
- static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+ static long getMaxCommittedCheckpointId(Table table, String flinkJobId,
String operatorId) {
Snapshot snapshot = table.currentSnapshot();
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
- if (flinkJobId.equals(snapshotFlinkJobId)) {
+ String snapshotOperatorId = summary.get(OPERATOR_ID);
+ if (flinkJobId.equals(snapshotFlinkJobId)
+ && (snapshotOperatorId == null ||
snapshotOperatorId.equals(operatorId))) {
String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
if (value != null) {
lastCommittedCheckpointId = Long.parseLong(value);
diff --git
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 5a4cf6223c..66baaeb0e9 100644
---
a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++
b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
@@ -118,13 +119,15 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long checkpointId = 0;
long timestamp = 0;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// It's better to advance the max-committed-checkpoint-id in iceberg
snapshot, so that the
// future flink job
@@ -137,7 +140,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
}
}
@@ -180,9 +183,12 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long timestamp = 0;
JobID jobID = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobID)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
+
assertSnapshotSize(0);
List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
@@ -200,7 +206,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(jobID, i);
+ assertMaxCommittedCheckpointId(jobID, operatorId, i);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
table.currentSnapshot().summary().get("flink.test"));
@@ -218,17 +224,19 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long timestamp = 0;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createRowData(1, "hello");
DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
long firstCheckpointId = 1;
@@ -238,7 +246,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
RowData row2 = SimpleDataUtil.createRowData(2, "world");
DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
harness.processElement(of(dataFile2), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 2. snapshotState for checkpoint#2
long secondCheckpointId = 2;
@@ -248,13 +256,13 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
- assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
assertFlinkManifests(1);
// 4. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
- assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
}
@@ -269,17 +277,19 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long timestamp = 0;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createRowData(1, "hello");
DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
long firstCheckpointId = 1;
@@ -289,7 +299,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
RowData row2 = SimpleDataUtil.createRowData(2, "world");
DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
harness.processElement(of(dataFile2), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 2. snapshotState for checkpoint#2
long secondCheckpointId = 2;
@@ -299,13 +309,13 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(secondCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
- assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
// 4. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(firstCheckpointId);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
- assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
assertFlinkManifests(0);
}
}
@@ -318,12 +328,14 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
OperatorSubtaskState snapshot;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row = SimpleDataUtil.createRowData(1, "hello");
expectedRows.add(row);
@@ -338,18 +350,19 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
assertSnapshotSize(1);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
// Restore from the given snapshot
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
+ harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.initializeState(snapshot);
harness.open();
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(1);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
RowData row = SimpleDataUtil.createRowData(2, "world");
expectedRows.add(row);
@@ -364,7 +377,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(2);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
}
}
@@ -378,12 +391,14 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
OperatorSubtaskState snapshot;
List<RowData> expectedRows = Lists.newArrayList();
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row = SimpleDataUtil.createRowData(1, "hello");
expectedRows.add(row);
@@ -392,11 +407,12 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
snapshot = harness.snapshot(++checkpointId, ++timestamp);
SimpleDataUtil.assertTableRows(table, ImmutableList.of());
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
assertFlinkManifests(1);
}
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
+ harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.initializeState(snapshot);
harness.open();
@@ -406,7 +422,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, expectedRows);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
harness.snapshot(++checkpointId, ++timestamp);
// Did not write any new record, so it won't generate new manifest.
@@ -417,7 +433,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(2);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
RowData row = SimpleDataUtil.createRowData(2, "world");
expectedRows.add(row);
@@ -435,13 +451,14 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
harness.setup();
harness.initializeState(snapshot);
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
// All flink manifests should be cleaned because it has committed the
unfinished iceberg
// transaction.
assertFlinkManifests(0);
- assertMaxCommittedCheckpointId(newJobId, -1);
- assertMaxCommittedCheckpointId(jobId, checkpointId);
+ assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(3);
@@ -458,7 +475,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, expectedRows);
assertSnapshotSize(4);
- assertMaxCommittedCheckpointId(newJobId, checkpointId);
+ assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
}
}
@@ -470,13 +487,15 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
List<RowData> tableRows = Lists.newArrayList();
JobID oldJobId = new JobID();
+ OperatorID oldOperatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(oldJobId)) {
harness.setup();
harness.open();
+ oldOperatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(oldJobId, -1L);
+ assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, -1L);
for (int i = 1; i <= 3; i++) {
rows.add(SimpleDataUtil.createRowData(i, "hello" + i));
@@ -492,7 +511,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(oldJobId, checkpointId);
+ assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
}
}
@@ -500,14 +519,16 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
checkpointId = 0;
timestamp = 0;
JobID newJobId = new JobID();
+ OperatorID newOperatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(newJobId)) {
harness.setup();
harness.open();
+ newOperatorId = harness.getOperator().getOperatorID();
assertSnapshotSize(3);
- assertMaxCommittedCheckpointId(oldJobId, 3);
- assertMaxCommittedCheckpointId(newJobId, -1);
+ assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, 3);
+ assertMaxCommittedCheckpointId(newJobId, newOperatorId, -1);
rows.add(SimpleDataUtil.createRowData(2, "world"));
tableRows.addAll(rows);
@@ -521,7 +542,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(4);
- assertMaxCommittedCheckpointId(newJobId, checkpointId);
+ assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
}
}
@@ -531,16 +552,20 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
List<RowData> tableRows = Lists.newArrayList();
JobID[] jobs = new JobID[] {new JobID(), new JobID(), new JobID()};
+ OperatorID[] operatorIds =
+ new OperatorID[] {new OperatorID(), new OperatorID(), new
OperatorID()};
for (int i = 0; i < 20; i++) {
int jobIndex = i % 3;
int checkpointId = i / 3;
JobID jobId = jobs[jobIndex];
+ OperatorID operatorId = operatorIds[jobIndex];
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
+ harness.getStreamConfig().setOperatorID(operatorId);
harness.setup();
harness.open();
assertSnapshotSize(i);
- assertMaxCommittedCheckpointId(jobId, checkpointId == 0 ? -1 :
checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId == 0 ?
-1 : checkpointId);
List<RowData> rows =
Lists.newArrayList(SimpleDataUtil.createRowData(i, "word-" + i));
tableRows.addAll(rows);
@@ -554,21 +579,121 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(i + 1);
- assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
}
}
}
+ @Test
+ public void testMultipleSinksRecoveryFromValidSnapshot() throws Exception {
+ long checkpointId = 0;
+ long timestamp = 0;
+ List<RowData> expectedRows = Lists.newArrayList();
+ OperatorSubtaskState snapshot1;
+ OperatorSubtaskState snapshot2;
+
+ JobID jobId = new JobID();
+ OperatorID operatorId1 = new OperatorID();
+ OperatorID operatorId2 = new OperatorID();
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 =
createStreamSink(jobId);
+ OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 =
createStreamSink(jobId)) {
+ harness1.getStreamConfig().setOperatorID(operatorId1);
+ harness1.setup();
+ harness1.open();
+ harness2.getStreamConfig().setOperatorID(operatorId2);
+ harness2.setup();
+ harness2.open();
+
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, -1L);
+
+ RowData row1 = SimpleDataUtil.createRowData(1, "hello1");
+ expectedRows.add(row1);
+ DataFile dataFile1 = writeDataFile("data-1-1", ImmutableList.of(row1));
+
+ harness1.processElement(of(dataFile1), ++timestamp);
+ snapshot1 = harness1.snapshot(++checkpointId, ++timestamp);
+
+ RowData row2 = SimpleDataUtil.createRowData(1, "hello2");
+ expectedRows.add(row2);
+ DataFile dataFile2 = writeDataFile("data-1-2", ImmutableList.of(row2));
+
+ harness2.processElement(of(dataFile2), ++timestamp);
+ snapshot2 = harness2.snapshot(checkpointId, ++timestamp);
+ assertFlinkManifests(2);
+
+ // Only notify one of the committers
+ harness1.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(1);
+
+ // Only the first row is committed at this point
+ SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+ assertSnapshotSize(1);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
+ }
+
+ // Restore from the given snapshot
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness1 =
createStreamSink(jobId);
+ OneInputStreamOperatorTestHarness<WriteResult, Void> harness2 =
createStreamSink(jobId)) {
+ harness1.getStreamConfig().setOperatorID(operatorId1);
+ harness1.setup();
+ harness1.initializeState(snapshot1);
+ harness1.open();
+
+ harness2.getStreamConfig().setOperatorID(operatorId2);
+ harness2.setup();
+ harness2.initializeState(snapshot2);
+ harness2.open();
+
+ // All flink manifests should be cleaned because it has committed the
unfinished iceberg
+ // transaction.
+ assertFlinkManifests(0);
+
+ SimpleDataUtil.assertTableRows(table, expectedRows);
+ assertSnapshotSize(2);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+
+ RowData row1 = SimpleDataUtil.createRowData(2, "world1");
+ expectedRows.add(row1);
+ DataFile dataFile1 = writeDataFile("data-2-1", ImmutableList.of(row1));
+
+ harness1.processElement(of(dataFile1), ++timestamp);
+ harness1.snapshot(++checkpointId, ++timestamp);
+
+ RowData row2 = SimpleDataUtil.createRowData(2, "world2");
+ expectedRows.add(row2);
+ DataFile dataFile2 = writeDataFile("data-2-2", ImmutableList.of(row2));
+ harness2.processElement(of(dataFile2), ++timestamp);
+ harness2.snapshot(checkpointId, ++timestamp);
+
+ assertFlinkManifests(2);
+
+ harness1.notifyOfCompletedCheckpoint(checkpointId);
+ harness2.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
+ SimpleDataUtil.assertTableRows(table, expectedRows);
+ assertSnapshotSize(4);
+ assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
+ }
+ }
+
@Test
public void testBoundedStream() throws Exception {
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
assertFlinkManifests(0);
assertSnapshotSize(0);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
List<RowData> tableRows =
Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1"));
@@ -579,7 +704,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
assertFlinkManifests(0);
SimpleDataUtil.assertTableRows(table, tableRows);
assertSnapshotSize(1);
- assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
+ assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
Assert.assertEquals(
TestIcebergFilesCommitter.class.getName(),
table.currentSnapshot().summary().get("flink.test"));
@@ -592,23 +717,24 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
final long checkpoint = 10;
JobID jobId = new JobID();
+ OperatorID operatorId;
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createRowData(1, "hello");
DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
- String operatorId =
harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals(
"File name should have the expected pattern.",
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0,
checkpoint, 1),
@@ -623,7 +749,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
}
@@ -636,24 +762,25 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long checkpoint = 10;
JobID jobId = new JobID();
+ OperatorID operatorId;
FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData row1 = SimpleDataUtil.createInsert(1, "aaa");
DataFile dataFile1 = writeDataFile("data-file-1",
ImmutableList.of(row1));
harness.processElement(of(dataFile1), ++timestamp);
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
// 1. snapshotState for checkpoint#1
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
- String operatorId =
harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals(
"File name should have the expected pattern.",
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0,
checkpoint, 1),
@@ -668,7 +795,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 3. notifyCheckpointComplete for checkpoint#1
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
// 4. process both data files and delete files.
@@ -681,7 +808,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
harness.processElement(
WriteResult.builder().addDataFiles(dataFile2).addDeleteFiles(deleteFile1).build(),
++timestamp);
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
// 5. snapshotState for checkpoint#2
harness.snapshot(++checkpoint, ++timestamp);
@@ -690,7 +817,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// 6. notifyCheckpointComplete for checkpoint#2
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
}
}
@@ -703,13 +830,15 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
long checkpoint = 10;
JobID jobId = new JobID();
+ OperatorID operatorId;
FileAppenderFactory<RowData> appenderFactory =
createDeletableAppenderFactory();
try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness =
createStreamSink(jobId)) {
harness.setup();
harness.open();
+ operatorId = harness.getOperator().getOperatorID();
- assertMaxCommittedCheckpointId(jobId, -1L);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
@@ -739,7 +868,7 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
// Notify the 2nd snapshot to complete.
harness.notifyOfCompletedCheckpoint(checkpoint);
SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1,
insert4));
- assertMaxCommittedCheckpointId(jobId, checkpoint);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
assertFlinkManifests(0);
Assert.assertEquals(
"Should have committed 2 txn.", 2,
ImmutableList.copyOf(table.snapshots()).size());
@@ -818,9 +947,11 @@ public class TestIcebergFilesCommitter extends
TableTestBase {
rows);
}
- private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+ private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID
operatorID, long expectedId) {
table.refresh();
- long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId(table,
jobID.toString());
+ long actualId =
+ IcebergFilesCommitter.getMaxCommittedCheckpointId(
+ table, jobID.toString(), operatorID.toHexString());
Assert.assertEquals(expectedId, actualId);
}