StefanRRichter commented on a change in pull request #7009: [FLINK-10712] 
Support to restore state when using RestartPipelinedRegionStrategy
URL: https://github.com/apache/flink/pull/7009#discussion_r245663755
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 ##########
 @@ -206,13 +206,21 @@ private void restart(long globalModVersionOfFailover) {
                try {
                        if (transitionState(JobStatus.CREATED, 
JobStatus.RUNNING)) {
                                // if we have checkpointed state, reload it 
into the executions
-                               //TODO: checkpoint support restore part 
ExecutionVertex cp
-                               /**
                                if (executionGraph.getCheckpointCoordinator() 
!= null) {
+                                       // we restart the checkpoint scheduler 
for
+                                       // i) enable new checkpoint could be 
triggered without waiting for last checkpoint expired.
+                                       // ii) ensure the EXACTLY_ONCE 
semantics if needed.
+                                       if 
(executionGraph.getCheckpointCoordinator().isPeriodicCheckpointingConfigured()) 
{
+                                               
executionGraph.getCheckpointCoordinator().stopCheckpointScheduler();
+                                       }
+
                                        
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
-                                                       
connectedExecutionVertexes, false, false);
+                                                       
connectedExecutionVertexes, false);
 
 Review comment:
   I have a general a high-level question about this whole approach, that I 
forgot to ask in my previous review: do we even need to introduce this 
additional restore method. It seems to me that all it does is optimizing the 
state assignment a little bit for regional failover (at slightly increased 
complexity for the full recovery cases).
   
   Instead, couldn't we just simply use the existing 
`restoreLatestCheckpointedState` method, without providing any indexes and just 
call it from here with `executionGraph.allVertices()`. State assignment is a 
simple meta data operation and should run very fast in general. As this will 
only modify the `restoreState` variable, only the vertices that are actually 
restarted for the failed region see the effect. The remaining vertices are not 
restarted and do not care about the change. We might need to change 
`setInitialState` for this by removing the precondition that checks for 
`CREATED` or only assign to instances in this state.
   If this is just an optimization attempt for this special case, this could 
reduce the amount of changes and potentiall bugs by a lot. What do you think? 
Did you find any other reason why this whole index handling is required?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to