Zakelly commented on code in PR #27157:
URL: https://github.com/apache/flink/pull/27157#discussion_r2476657951


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStSnapshotStrategyBase.java:
##########
@@ -269,6 +269,26 @@ protected PreviousSnapshot(@Nullable 
Collection<HandleAndLocalPath> confirmedSst
                             : Collections.emptyMap();
         }
 
+        /**
+         * Remove the sst files which have been re-uploaded in the following 
checkpoint from the
+         * confirmed sst files.
+         *
+         * @param followingUploadedSstFiles the sst files uploaded in the 
following checkpoint.
+         */
+        protected void removeReUploadedConfirmedSstFiles(
+                @Nonnull Collection<HandleAndLocalPath> 
followingUploadedSstFiles) {
+            if (!confirmedSstFiles.isEmpty()) {
+                followingUploadedSstFiles.forEach(
+                        e -> {
+                            if (!(e.getHandle() instanceof 
PlaceholderStreamStateHandle)) {
+                                // If it's not a placeholder handle, it means 
the sst file has been
+                                // re-uploaded in the following checkpoint.
+                                confirmedSstFiles.remove(e.getLocalPath());

Review Comment:
   Sure. I wrapped all the prune logic into `PreviousSnapshot` to make it more 
clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to