akalash commented on a change in pull request #15846:
URL: https://github.com/apache/flink/pull/15846#discussion_r630316049
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
##########
@@ -140,71 +140,20 @@ public void recover() throws Exception {
return;
}
- // Try and read the state handles from storage. We try until we either
successfully read
- // all of them or when we reach a stable state, i.e. when we
successfully read the same set
- // of checkpoints in two tries. We do it like this to protect against
transient outages
- // of the checkpoint store (for example a DFS): if the DFS comes
online midway through
- // reading a set of checkpoints we would run the risk of reading only
a partial set
- // of checkpoints while we could in fact read the other checkpoints as
well if we retried.
- // Waiting until a stable state protects against this while also being
resilient against
- // checkpoints being actually unreadable.
- //
- // These considerations are also important in the scope of incremental
checkpoints, where
- // we use ref-counting for shared state handles and might accidentally
delete shared state
- // of checkpoints that we don't read due to transient storage outages.
- final List<CompletedCheckpoint> lastTryRetrievedCheckpoints =
- new ArrayList<>(numberOfInitialCheckpoints);
final List<CompletedCheckpoint> retrievedCheckpoints =
new ArrayList<>(numberOfInitialCheckpoints);
- Exception retrieveException = null;
- do {
- LOG.info("Trying to fetch {} checkpoints from storage.",
numberOfInitialCheckpoints);
-
- lastTryRetrievedCheckpoints.clear();
- lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
-
- retrievedCheckpoints.clear();
-
- for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>
checkpointStateHandle :
- initialCheckpoints) {
-
- CompletedCheckpoint completedCheckpoint;
-
- try {
- completedCheckpoint =
retrieveCompletedCheckpoint(checkpointStateHandle);
- if (completedCheckpoint != null) {
- retrievedCheckpoints.add(completedCheckpoint);
- }
- } catch (Exception e) {
- LOG.warn(
- "Could not retrieve checkpoint, not adding to list
of recovered checkpoints.",
- e);
- retrieveException = e;
- }
- }
+ LOG.info("Trying to fetch {} checkpoints from storage.",
numberOfInitialCheckpoints);
- } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints
- && !CompletedCheckpoint.checkpointsMatch(
- lastTryRetrievedCheckpoints, retrievedCheckpoints));
+ for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>
checkpointStateHandle :
+ initialCheckpoints) {
+ retrievedCheckpoints.add(
+
checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
Review comment:
I am sure this is right. But I just emphasize that the logic was
changed, more specifically, before your changes the code was saying that it is
not a problem to have NULL from retrieveCompletedCheckpoint(we just ignore it)
but after your changes, your code says that retrieveCompletedCheckpoint should
always return not null value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]