zhuzhurk commented on a change in pull request #9920: [FLINK-14389][runtime] 
Restore task state before restarting tasks in DefaultScheduler
URL: https://github.com/apache/flink/pull/9920#discussion_r340143812
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##########
 @@ -277,9 +282,39 @@ private void 
tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraph
                }
        }
 
-       protected void resetForNewExecutionIfInTerminalState(final 
Collection<ExecutionVertexID> verticesToDeploy) {
-               verticesToDeploy.forEach(executionVertexId -> 
getExecutionVertex(executionVertexId)
-                       .resetForNewExecutionIfInTerminalState());
+       protected void resetForNewExecutions(final 
Collection<ExecutionVertexID> vertices) {
+               vertices.forEach(executionVertexId -> 
getExecutionVertex(executionVertexId)
+                       .resetForNewExecution());
+       }
+
+       protected void restoreState(final Set<ExecutionVertexID> vertices) 
throws Exception {
+               // if there is checkpointed state, reload it into the executions
+               if (executionGraph.getCheckpointCoordinator() != null) {
+                       // abort pending checkpoints to
+                       // i) enable new checkpoint triggering without waiting 
for last checkpoint expired.
+                       // ii) ensure the EXACTLY_ONCE semantics if needed.
+                       
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+                               new 
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
+
+                       final Map<JobVertexID, ExecutionJobVertex> 
involvedExecutionJobVertices =
+                               getInvolvedExecutionJobVertices(vertices);
+
+                       
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
 
 Review comment:
   You are right. I made the change in 
1d74f9981eca26f03d5daf610cb96d2e5174cb2b. All usages are also refactored except 
for those for the legacy scheduler.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to