akalash commented on a change in pull request #15846:
URL: https://github.com/apache/flink/pull/15846#discussion_r630316049



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
##########
@@ -140,71 +140,20 @@ public void recover() throws Exception {
             return;
         }
 
-        // Try and read the state handles from storage. We try until we either 
successfully read
-        // all of them or when we reach a stable state, i.e. when we 
successfully read the same set
-        // of checkpoints in two tries. We do it like this to protect against 
transient outages
-        // of the checkpoint store (for example a DFS): if the DFS comes 
online midway through
-        // reading a set of checkpoints we would run the risk of reading only 
a partial set
-        // of checkpoints while we could in fact read the other checkpoints as 
well if we retried.
-        // Waiting until a stable state protects against this while also being 
resilient against
-        // checkpoints being actually unreadable.
-        //
-        // These considerations are also important in the scope of incremental 
checkpoints, where
-        // we use ref-counting for shared state handles and might accidentally 
delete shared state
-        // of checkpoints that we don't read due to transient storage outages.
-        final List<CompletedCheckpoint> lastTryRetrievedCheckpoints =
-                new ArrayList<>(numberOfInitialCheckpoints);
         final List<CompletedCheckpoint> retrievedCheckpoints =
                 new ArrayList<>(numberOfInitialCheckpoints);
-        Exception retrieveException = null;
-        do {
-            LOG.info("Trying to fetch {} checkpoints from storage.", 
numberOfInitialCheckpoints);
-
-            lastTryRetrievedCheckpoints.clear();
-            lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
-
-            retrievedCheckpoints.clear();
-
-            for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle :
-                    initialCheckpoints) {
-
-                CompletedCheckpoint completedCheckpoint;
-
-                try {
-                    completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
-                    if (completedCheckpoint != null) {
-                        retrievedCheckpoints.add(completedCheckpoint);
-                    }
-                } catch (Exception e) {
-                    LOG.warn(
-                            "Could not retrieve checkpoint, not adding to list 
of recovered checkpoints.",
-                            e);
-                    retrieveException = e;
-                }
-            }
+        LOG.info("Trying to fetch {} checkpoints from storage.", 
numberOfInitialCheckpoints);
 
-        } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints
-                && !CompletedCheckpoint.checkpointsMatch(
-                        lastTryRetrievedCheckpoints, retrievedCheckpoints));
+        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle :
+                initialCheckpoints) {
+            retrievedCheckpoints.add(
+                    
checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));

Review comment:
       I am sure this is right. But I just emphasize that the logic was 
changed, more specifically, before your changes the code was saying that it is 
not a problem to have NULL from retrieveCompletedCheckpoint(we just ignore it) 
but after your changes, your code says that retrieveCompletedCheckpoint should 
always return not null value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to