rkhachatryan commented on a change in pull request #15846:
URL: https://github.com/apache/flink/pull/15846#discussion_r630894328
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
##########
@@ -140,71 +140,20 @@ public void recover() throws Exception {
return;
}
- // Try and read the state handles from storage. We try until we either
successfully read
- // all of them or when we reach a stable state, i.e. when we
successfully read the same set
- // of checkpoints in two tries. We do it like this to protect against
transient outages
- // of the checkpoint store (for example a DFS): if the DFS comes
online midway through
- // reading a set of checkpoints we would run the risk of reading only
a partial set
- // of checkpoints while we could in fact read the other checkpoints as
well if we retried.
- // Waiting until a stable state protects against this while also being
resilient against
- // checkpoints being actually unreadable.
- //
- // These considerations are also important in the scope of incremental
checkpoints, where
- // we use ref-counting for shared state handles and might accidentally
delete shared state
- // of checkpoints that we don't read due to transient storage outages.
- final List<CompletedCheckpoint> lastTryRetrievedCheckpoints =
- new ArrayList<>(numberOfInitialCheckpoints);
final List<CompletedCheckpoint> retrievedCheckpoints =
new ArrayList<>(numberOfInitialCheckpoints);
- Exception retrieveException = null;
- do {
- LOG.info("Trying to fetch {} checkpoints from storage.",
numberOfInitialCheckpoints);
-
- lastTryRetrievedCheckpoints.clear();
- lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
-
- retrievedCheckpoints.clear();
-
- for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>
checkpointStateHandle :
- initialCheckpoints) {
-
- CompletedCheckpoint completedCheckpoint;
-
- try {
- completedCheckpoint =
retrieveCompletedCheckpoint(checkpointStateHandle);
- if (completedCheckpoint != null) {
- retrievedCheckpoints.add(completedCheckpoint);
- }
- } catch (Exception e) {
- LOG.warn(
- "Could not retrieve checkpoint, not adding to list
of recovered checkpoints.",
- e);
- retrieveException = e;
- }
- }
+ LOG.info("Trying to fetch {} checkpoints from storage.",
numberOfInitialCheckpoints);
- } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints
- && !CompletedCheckpoint.checkpointsMatch(
- lastTryRetrievedCheckpoints, retrievedCheckpoints));
+ for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>
checkpointStateHandle :
+ initialCheckpoints) {
+ retrievedCheckpoints.add(
+
checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
Review comment:
Good point. The null check was there long time ago - though not because
`retrieveCompletedCheckpoint` can return null, but because it was in a separate
[try-catch
block](https://github.com/apache/flink/blob/3d119e1155aa8930cc7b18a085d6790cb2c63b70/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L171).
Later the assignment was
[moved](https://github.com/apache/flink/commit/0162543ac13f048ef67a6586d8a6e8021ec9dcd4#diff-adad2a7903a652b5ff42cd5d443f253b900ec9ea3bd78cee731898e70961ed20R170)
to the same try-catch, but unnecessary now null check remained.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]