Myasuka commented on a change in pull request #7009: [FLINK-10712] Support to 
restore state when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#discussion_r246074413
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 ##########
 @@ -206,13 +206,21 @@ private void restart(long globalModVersionOfFailover) {
                try {
                        if (transitionState(JobStatus.CREATED, 
JobStatus.RUNNING)) {
                                // if we have checkpointed state, reload it 
into the executions
-                               //TODO: checkpoint support restore part 
ExecutionVertex cp
-                               /**
                                if (executionGraph.getCheckpointCoordinator() 
!= null) {
+                                       // we restart the checkpoint scheduler 
for
+                                       // i) enable new checkpoint could be 
triggered without waiting for last checkpoint expired.
+                                       // ii) ensure the EXACTLY_ONCE 
semantics if needed.
+                                       if 
(executionGraph.getCheckpointCoordinator().isPeriodicCheckpointingConfigured()) 
{
+                                               
executionGraph.getCheckpointCoordinator().stopCheckpointScheduler();
+                                       }
+
                                        
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
-                                                       
connectedExecutionVertexes, false, false);
+                                                       
connectedExecutionVertexes, false);
 
 Review comment:
   In my point of view, we should not depend on execution's current status to 
determine whether to assign state.
   With the given indices, we could figure out the logical plan containing 
exact executions needed to be assigned. While try to assign state to all 
executions and use execution's status to determine whether assigning initial 
state would cause potential bugs indeed.
   Moreover, current implementation has already use one base 
`restoreLatestCheckpointedState` method to handle both situations to reduce 
potential bugs happening. On the other hand, I think 
[FLINK-10713](https://issues.apache.org/jira/browse/FLINK-10713) could also 
reuse new `restoreLatestCheckpointedState(List<ExecutionVertex>, boolean)` 
method.
   I'm not sure whether I have expressed my thoughts clearly to convince you, 
but please leave any thoughts or concerns if further discussion still needed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to