StephanEwen opened a new pull request #12137:
URL: https://github.com/apache/flink/pull/12137
## What is the purpose of the change
This Pull Request contains a series of changes that ultimately serve two
goals:
1. Complete checkpointing/restoring state for OperatorCoordinators
(previously restores were not implemented)
2. Only restore operator coordinators on after "global failures" or
"global restore".
The distinction between Global failures/restores and task failures/restores
mean the following:
- When a task fails, then that task and others (in the same failover
region) are restarted with the latest checkpoint state. In that case, the
Operator Coordinators only get notifications, the coordinator is not reset to
the previous checkpoint. That is because some subtasks may potentially continue
to run.
- When a global failure happens (for example as part of a master failover,
a change in JobManager leader status, or through the safety net in the
scheduler) then all tasks are reset to the previous state, and likewise the
coordinators are reset to the previous checkpoint. In those situations, there
would be the danger inconistencies if the coordinator did not reset to the
checkpoint.
- A restore from a savepoint is likewise a global restore (it is a special
case of a change in leader status)
## Brief change log
Changes 42f3576, ad78321, 16c1524, 2a3fabc, 5e8dfd8, e6063a8 are simple
refactoring and fixes of minor bugs. These commits should be self contained not
too complex.
11bc22f improves the failure handling by sending failure notifications to
the `OperatorCoordinator` as soon as a task failed, and not only when it gets
recovered. That makes failure handling in the coordinator faster.
6452bd9 simply forwards the exception that caused the failure originally to
the `OperatorCoordinator.subtaskFailed()` method.
40f9067 Completes the remainders of the checkpointing integration. It fixes
various previous issues and makes sure that `restore()` is always called on the
`OperatorCoordinator` whenever a checkpoint is restored.
a7e2a16 Introduces the two different methods in the `CheckpointCoordinator`:
- `restoreLatestCheckpointedStateToAll(...)` for restores after global
failures. This is the same as the previous behavior. This method also restores
`OperatorCoordinators`.
- `restoreLatestCheckpointedStateToSubtasks` for restores of (a subset of)
subtasks only. The method is initially unused.
The commit also removes the `failIfNoCheckpoint`flag which was only used
during savepoint restores. Because savepoint restores have their dedicated
method in the `CheckpointCoordinator` this flag was never used in the
production code and was not re-included in the two new methods.
2c5bd3b Changes the scheduler to call the two different methods introduces
in the previous commits in the respective situations (task failure versus
global failure).
## Verifying this change
The changes here are covered by various additional unit tests, specifically
in the class `OperatorCoordinatorSchedulerTest`
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no** (not a
user-facing feature)
- If yes, how is the feature documented? **not applicable**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]