Zakelly commented on code in PR #27157:
URL: https://github.com/apache/flink/pull/27157#discussion_r2477269053
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java:
##########
@@ -250,23 +251,75 @@ public void release() {
}
}
- protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
- new PreviousSnapshot(Collections.emptyList());
+ protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = new
PreviousSnapshot(null);
/** Previous snapshot with uploaded sst files. */
protected static class PreviousSnapshot {
@Nonnull private final Map<String, StreamStateHandle>
confirmedSstFiles;
- protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath>
confirmedSstFiles) {
+ /**
+ * Constructor of PreviousSnapshot. Giving a map of uploaded sst files
in previous
+ * checkpoints, prune the sst files which have been re-uploaded in the
following
+ * checkpoints. The prune logic is used to resolve the mismatch
between TM and JM due to
+ * notification delay. Following steps for example:
+ *
+ * <ul>
+ * <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
+ * <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads
it as yyy.sst because
+ * CP 1 wasn't yet confirmed.
+ * <li>3) TM get a confirmation of checkpoint 1.
+ * <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 -
removing xxx.sst.
+ * <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as
xxx.sst in checkpoint 1,
+ * but it was deleted in (4) by JM.
+ * </ul>
+ *
+ * @param currentUploadedSstFiles the sst files uploaded in previous
checkpoints.
+ */
+ protected PreviousSnapshot(
+ @Nullable SortedMap<Long, Collection<HandleAndLocalPath>>
currentUploadedSstFiles) {
this.confirmedSstFiles =
- confirmedSstFiles != null
- ? confirmedSstFiles.stream()
+ currentUploadedSstFiles != null
+ ?
pruneFirstCheckpointSstFiles(currentUploadedSstFiles)
+ : Collections.emptyMap();
+ }
+
+ /**
+ * The first checkpoint's uploaded sst files are all included, then
for each following
+ * checkpoint, if a sst file has been re-uploaded, remove it from the
first checkpoint's sst
+ * files.
+ *
+ * @param currentUploadedSstFiles the sst files uploaded in the
following checkpoint.
+ */
+ private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
+ @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>>
currentUploadedSstFiles) {
+ Map<String, StreamStateHandle> prunedSstFiles = null;
+ for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
+ currentUploadedSstFiles.entrySet()) {
+ // Iterate checkpoints in ascending order of checkpoint id.
+ if (prunedSstFiles == null) {
+ // The first checkpoint's uploaded sst files are all
included.
+ prunedSstFiles =
+ entry.getValue().stream()
.collect(
Collectors.toMap(
HandleAndLocalPath::getLocalPath,
-
HandleAndLocalPath::getHandle))
- : Collections.emptyMap();
+
HandleAndLocalPath::getHandle));
+ } else if (!prunedSstFiles.isEmpty()) {
+ // Prune sst files which have been re-uploaded in the
following checkpoints.
+ for (HandleAndLocalPath handleAndLocalPath :
entry.getValue()) {
+ if (!(handleAndLocalPath.getHandle()
+ instanceof PlaceholderStreamStateHandle)) {
+ // If it's not a placeholder handle, it means the
sst file has been
+ // re-uploaded in the following checkpoint.
+
prunedSstFiles.remove(handleAndLocalPath.getLocalPath());
Review Comment:
I added log of removed file number on trace level as the outer
`snapshotMetaData` also log on trace level.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]