StephanEwen commented on a change in pull request #14186:
URL: https://github.com/apache/flink/pull/14186#discussion_r530324662



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1274,7 +1278,11 @@ public boolean 
restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVe
                //     of the restarted region), meaning there will be 
unmatched state by design.
                //   - because what we might end up restoring from an original 
savepoint with unmatched
                //     state, if there is was no checkpoint yet.
-               return restoreLatestCheckpointedStateInternal(tasks, false, 
false, true);
+               return restoreLatestCheckpointedStateInternal(
+                               tasks,
+                               CoordinatorRestore.NOT_RESTORED, // 
local/regional recovery does not reset coordinators

Review comment:
       Coordinators handle individual subtask failures through notifications 
(`OperatorCoordinator.subtaskFailed()`), For example, for sources, we want that 
the assigned (not checkpointed) splits are added back to the backlog. We don't 
want the state of the enumerator as a whole to be reset to a checkpoint, 
because that would also affect all the subtasks that are not doing a failover.
   
   For global failover, we initially were thinking to just have notifications 
for all subtasks as well. But then we decided to do a reset-to-checkpoint on a 
global failover, so that the global failover retains its "safty net" property: 
If anything is suspected to be inconsistent, a global failover make things 
consistent again.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to