zhuzhurk commented on a change in pull request #12137:
URL: https://github.com/apache/flink/pull/12137#discussion_r425754123
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##########
@@ -122,7 +135,20 @@ public boolean canRestart() {
* @return result of a set of tasks to restart to recover from the
failure
*/
public static FailureHandlingResult restartable(Set<ExecutionVertexID>
verticesToRestart, long restartDelayMS) {
Review comment:
Minor: If we make `globalFailure` a param instead of adding new factory
methods, seems the changes in both `FailureHandlingResult` can
`ExecutionFailureHandler` can be much simpler.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1127,16 +1127,44 @@ public boolean restoreLatestCheckpointedState(
boolean errorIfNoCheckpoint,
boolean allowNonRestoredState) throws Exception {
- return restoreLatestCheckpointedState(new
HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
+ return restoreLatestCheckpointedStateInternal(new
HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
}
/**
- * Restores the latest checkpointed state.
+ * Restores the latest checkpointed state to a set of subtasks. This
method represents a "local"
+ * or "regional" failover and does restore states to coordinators. Note
that a regional failover
+ * might still include all tasks.
+ *
+ * @param tasks Set of job vertices to restore. State for these
vertices is
+ * restored via {@link
Execution#setInitialState(JobManagerTaskRestore)}.
+
+ * @return <code>true</code> if state was restored, <code>false</code>
otherwise.
+ * @throws IllegalStateException If the CheckpointCoordinator is shut
down.
+ * @throws IllegalStateException If no completed checkpoint is
available and
+ * the <code>failIfNoCheckpoint</code>
flag has been set.
+ * @throws IllegalStateException If the checkpoint contains state that
cannot be
+ * mapped to any job vertex in
<code>tasks</code> and the
+ * <code>allowNonRestoredState</code>
flag has not been set.
+ * @throws IllegalStateException If the max parallelism changed for an
operator
+ * that restores state from this
checkpoint.
+ * @throws IllegalStateException If the parallelism changed for an
operator
+ * that restores <i>non-partitioned</i>
state from this
+ * checkpoint.
+ */
+ public boolean restoreLatestCheckpointedStateToSubtasks(final
Set<ExecutionJobVertex> tasks) throws Exception {
+ // when restoring subtasks only we accept potentially unmatched
state because what we
Review comment:
I think another reason why `allowNonRestoredState` must be true is that
not all `JobVertex` are checked so that states of an absent `JobVertex` cannot
get matched.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]