tillrohrmann commented on a change in pull request #14186:
URL: https://github.com/apache/flink/pull/14186#discussion_r529563356
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1274,7 +1278,11 @@ public boolean
restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVe
// of the restarted region), meaning there will be
unmatched state by design.
// - because what we might end up restoring from an original
savepoint with unmatched
// state, if there is was no checkpoint yet.
- return restoreLatestCheckpointedStateInternal(tasks, false,
false, true);
+ return restoreLatestCheckpointedStateInternal(
+ tasks,
+ CoordinatorRestore.NOT_RESTORED, //
local/regional recovery does not reset coordinators
Review comment:
This is a bit off-topic but why is it ok to not restore an operator
coordinator whose subtasks might be contained in `tasks`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1304,12 +1312,34 @@ public boolean restoreLatestCheckpointedStateToAll(
final Set<ExecutionJobVertex> tasks,
final boolean allowNonRestoredState) throws Exception {
- return restoreLatestCheckpointedStateInternal(tasks, true,
false, allowNonRestoredState);
+ return restoreLatestCheckpointedStateInternal(
+ tasks,
+ CoordinatorRestore.ALWAYS, // global recovery
restores coordinators, or resets them to empty
+ false, // recovery might come before first
successful checkpoint
+ allowNonRestoredState);
+ }
+
+ /**
+ * Restores the latest checkpointed at the beginning of the job
execution.
+ * If there is a checkpoint, this method acts like a "global
restore"-style
+ * operation where all stateful tasks and coordinators from the given
+ * set of Job Vertices are restored.
+ *
+ * @param tasks Set of job vertices to restore. State for these
vertices is
+ * restored via {@link
Execution#setInitialState(JobManagerTaskRestore)}.
+ * @return True, if a checkpoint was found and its state was restored,
false otherwise.
+ */
+ public boolean restoreInitialCheckpointIfPresent(final
Set<ExecutionJobVertex> tasks) throws Exception {
+ return restoreLatestCheckpointedStateInternal(
+ tasks,
+ CoordinatorRestore.ONLY_FOR_EXISTING_CHECKPOINT,
Review comment:
Is it important that we only restore the coordinator's state if there is
a checkpoint or is it some kind of optimization?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1304,12 +1312,34 @@ public boolean restoreLatestCheckpointedStateToAll(
final Set<ExecutionJobVertex> tasks,
final boolean allowNonRestoredState) throws Exception {
- return restoreLatestCheckpointedStateInternal(tasks, true,
false, allowNonRestoredState);
+ return restoreLatestCheckpointedStateInternal(
+ tasks,
+ CoordinatorRestore.ALWAYS, // global recovery
restores coordinators, or resets them to empty
+ false, // recovery might come before first
successful checkpoint
+ allowNonRestoredState);
+ }
+
+ /**
+ * Restores the latest checkpointed at the beginning of the job
execution.
Review comment:
```suggestion
* Restores the latest checkpointed state at the beginning of the job
execution.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]