zhuzhurk commented on a change in pull request #9920: [FLINK-14389][runtime]
Restore task state before restarting tasks in DefaultScheduler
URL: https://github.com/apache/flink/pull/9920#discussion_r340143812
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -277,9 +282,39 @@ private void
tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraph
}
}
- protected void resetForNewExecutionIfInTerminalState(final
Collection<ExecutionVertexID> verticesToDeploy) {
- verticesToDeploy.forEach(executionVertexId ->
getExecutionVertex(executionVertexId)
- .resetForNewExecutionIfInTerminalState());
+ protected void resetForNewExecutions(final
Collection<ExecutionVertexID> vertices) {
+ vertices.forEach(executionVertexId ->
getExecutionVertex(executionVertexId)
+ .resetForNewExecution());
+ }
+
+ protected void restoreState(final Set<ExecutionVertexID> vertices)
throws Exception {
+ // if there is checkpointed state, reload it into the executions
+ if (executionGraph.getCheckpointCoordinator() != null) {
+ // abort pending checkpoints to
+ // i) enable new checkpoint triggering without waiting
for last checkpoint expired.
+ // ii) ensure the EXACTLY_ONCE semantics if needed.
+
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+ new
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
+
+ final Map<JobVertexID, ExecutionJobVertex>
involvedExecutionJobVertices =
+ getInvolvedExecutionJobVertices(vertices);
+
+
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
Review comment:
You are right. I made the change in
1d74f9981eca26f03d5daf610cb96d2e5174cb2b. All usages are also refactored except
for those for the legacy scheduler.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services