mxm commented on code in PR #15913:
URL: https://github.com/apache/iceberg/pull/15913#discussion_r3066078886
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
}
private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws
IOException {
- if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+ if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
}
- for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
- writeResultsSinceLastSnapshot.entrySet()) {
- dataFilesPerCheckpoint.put(
- writeResultsOfCheckpoint.getKey(),
- writeToManifest(writeResultsOfCheckpoint.getKey(),
writeResultsOfCheckpoint.getValue()));
+ Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+ for (Map.Entry<Long, List<WriteResult>> entry :
writeResultsSinceLastSnapshot.entrySet()) {
+ long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+ pendingWriteResults
+ .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+ .addAll(entry.getValue());
+ }
+
Review Comment:
When post-barrier `WriteResult`s include equality deletes (V2 tables), they
get merged with the next checkpoint's data and committed in a single
`RowDelta`. This changes the sequence number of those deletes compared to
aligned mode. The behavior should still be correct (deletes end up with a
higher sequence and still apply to earlier data), but it would be good to have
an explicit test for this scenario, e.g. `testPostBarrierEqualityDeleteFiles`
with format v2.
Also: since `commitDeltaTxn` commits each checkpoint's `RowDelta` separately
(#10526), post-barrier equality deletes that get redirected here end up in the
same `RowDelta` as the current checkpoint's data+deletes. Worth confirming this
doesn't cause duplication issues similar to the ones #10526 fixed.
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
}
private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws
IOException {
- if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
Review Comment:
Nit: remove the blank line after the opening brace (inconsistent with the
rest of the file).
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
}
private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws
IOException {
- if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+ if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
}
- for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
- writeResultsSinceLastSnapshot.entrySet()) {
- dataFilesPerCheckpoint.put(
- writeResultsOfCheckpoint.getKey(),
- writeToManifest(writeResultsOfCheckpoint.getKey(),
writeResultsOfCheckpoint.getValue()));
+ Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+ for (Map.Entry<Long, List<WriteResult>> entry :
writeResultsSinceLastSnapshot.entrySet()) {
+ long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+ pendingWriteResults
+ .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+ .addAll(entry.getValue());
+ }
+
+ for (Map.Entry<Long, List<WriteResult>> entry :
pendingWriteResults.entrySet()) {
+ dataFilesPerCheckpoint.put(entry.getKey(),
writeToManifest(entry.getKey(), entry.getValue()));
}
// Clear the local buffer for current checkpoint.
writeResultsSinceLastSnapshot.clear();
}
+ /**
+ * in case of unaligned checkpoints, data files that were part of checkpoint
N in the writer may
+ * have to become part of a later checkpoint in the committer if:
+ *
+ * <ul>
+ * <li>previous files were already committed for checkpoint N. We have to
keep the manifests for
+ * new files under a later key, otherwise they are discarded during
recovery after a crash
+ * <li>we already have a manifest of files to be committed for checkpoint
N, even though it
+ * might not have been committed yet. In this case, we must not
overwrite the manifests we
+ * already have, and we must keep them consistent with our checkpoint
+ * </ul>
+ */
+ private long computeCheckpointId(long checkpointId, Map.Entry<Long,
List<WriteResult>> entry) {
+ long sourceCheckpointId = entry.getKey();
+
+ boolean sourceCheckpointIdAlreadyCommitted = sourceCheckpointId <=
maxCommittedCheckpointId;
+ boolean sourceCheckpointIdHasDataInSnapshot =
+ dataFilesPerCheckpoint.containsKey(sourceCheckpointId);
+ // for aligned checkpoints, both conditions will be false and the upstream
operator's checkpoint
+ // ID
+ // will be chosen.
+ return sourceCheckpointIdAlreadyCommitted ||
sourceCheckpointIdHasDataInSnapshot
Review Comment:
Nit: reflow the comment to avoid the orphaned `// ID` line:
```suggestion
// For aligned checkpoints, both conditions will be false and the
upstream operator's
// checkpoint ID will be chosen.
```
##########
flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -1076,6 +1076,220 @@ public void testSpecEvolution() throws Exception {
}
}
+ /**
+ * (unaligned checkpoints) With unaligned checkpoints a writer subtask may
deliver its write
+ * result after {@code snapshotState(N)} has already fired. ("post-barrier
data"). This test
+ * verifies that post-barrier data for a checkpoint that never completes is
not lost, it must be
+ * committed together with the next successful checkpoint. Also covers the
case where the
+ * successful checkpoint itself has post-barrier data, which must then wait
for the checkpoint
+ * after that.
+ *
+ * <pre>
+ * processElement(dataA, checkpointId=1)
+ * snapshotState(1)
+ * processElement(dataB, checkpointId=1)
+ * // checkpoint 1 never completes
+ * processElement(dataC, checkpointId=2)
+ * snapshotState(2)
+ * processElement(dataD, checkpointId=2)
+ * notifyCheckpointComplete(2) // commits dataA, dataB, dataC
+ * snapshotState(3)
+ * notifyCheckpointComplete(3) // commits dataD
+ * </pre>
+ */
+ @TestTemplate
+ public void testPostBarrierDataSurvivesFailedCheckpoint() throws Exception {
+ long timestamp = 0;
+ JobID jobId = new JobID();
+ OperatorID operatorId;
+ try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+ createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+ operatorId = harness.getOperator().getOperatorID();
+
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+ RowData rowA = SimpleDataUtil.createRowData(1, "early-checkpoint1");
+ DataFile dataFileA = writeDataFile("data-A", ImmutableList.of(rowA));
+ RowData rowB = SimpleDataUtil.createRowData(2,
"post-barrier-checkpoint1");
+ DataFile dataFileB = writeDataFile("data-B", ImmutableList.of(rowB));
+ RowData rowC = SimpleDataUtil.createRowData(3, "early-checkpoint2");
+ DataFile dataFileC = writeDataFile("data-C", ImmutableList.of(rowC));
+ RowData rowD = SimpleDataUtil.createRowData(4,
"post-barrier-checkpoint2");
+ DataFile dataFileD = writeDataFile("data-D", ImmutableList.of(rowD));
+
+ long checkpoint1 = 1;
+ long checkpoint2 = 2;
+ long checkpoint3 = 3;
+
+ harness.processElement(of(checkpoint1, dataFileA), ++timestamp);
+ harness.snapshot(checkpoint1, ++timestamp);
+ assertFlinkManifests(1);
+
+ // post-barrier, arrives after snapshotState(1); checkpoint1 then FAILS
(no notify)
+ harness.processElement(of(checkpoint1, dataFileB), ++timestamp);
+
+ harness.processElement(of(checkpoint2, dataFileC), ++timestamp);
+ harness.snapshot(checkpoint2, ++timestamp);
Review Comment:
Nit: existing tests consistently verify manifest counts after `snapshot()`
and `notifyOfCompletedCheckpoint()`. Consider adding `assertFlinkManifests(2)`
here (one manifest from checkpoint 1's snapshot, one new manifest for
checkpoint 2) for consistency.
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -423,21 +423,53 @@ public void endInput() throws IOException {
}
private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws
IOException {
- if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+
+ if (!dataFilesPerCheckpoint.containsKey(checkpointId)) {
dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
}
- for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint :
- writeResultsSinceLastSnapshot.entrySet()) {
- dataFilesPerCheckpoint.put(
- writeResultsOfCheckpoint.getKey(),
- writeToManifest(writeResultsOfCheckpoint.getKey(),
writeResultsOfCheckpoint.getValue()));
+ Map<Long, List<WriteResult>> pendingWriteResults = Maps.newHashMap();
+ for (Map.Entry<Long, List<WriteResult>> entry :
writeResultsSinceLastSnapshot.entrySet()) {
+ long assignedCheckpointId = computeCheckpointId(checkpointId, entry);
+ pendingWriteResults
+ .computeIfAbsent(assignedCheckpointId, k -> Lists.newArrayList())
+ .addAll(entry.getValue());
+ }
+
+ for (Map.Entry<Long, List<WriteResult>> entry :
pendingWriteResults.entrySet()) {
+ dataFilesPerCheckpoint.put(entry.getKey(),
writeToManifest(entry.getKey(), entry.getValue()));
}
// Clear the local buffer for current checkpoint.
writeResultsSinceLastSnapshot.clear();
}
+ /**
+ * in case of unaligned checkpoints, data files that were part of checkpoint
N in the writer may
Review Comment:
Nit: Javadoc should start with a capital letter.
```suggestion
* In case of unaligned checkpoints, data files that were part of
checkpoint N in the writer may
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]