StephanEwen opened a new pull request #14256:
URL: https://github.com/apache/flink/pull/14256
## This is based upon PR #14255, which represent the first of the three
commits here.
# What is the purpose of the change
This extends the `OperatorCoordinator` with a `subtaskReset()` method.
- The `subtaskReset()` method is the notification of a local failover
(pipelined region) where the `OperatorCoordinator` is not reset as a whole, but
only resets the subtask-dependent state.
- That method complements the method `resetToCheckpoint()` which is called
on a global-failover and resets the entire state of the `OperatorCoordinator`.
The behavior is transparent across batch and for streaming execution:
- All notifications come on all global- and local failover, both in batch
and streaming
- For streaming execution, the methods are invoked with the ID of the
restored checkpoint.
- For streaming execution, if there is not yet any completed checkpoint,
then the methods are invoked with a checkpoint ID of _-1_. That represents a
"virtual empty checkpoint" that marks the "beginning of the execution". _-1_ is
before any other checkpoint ID, so it fits transparently into the contract of
monotonous checkpoint IDs and "later-checkpoints-subsume-earlier-checkpoints".
- Batch recovery behaves exactly like a streaming recovery before the
first checkpoint: There is only ever the virtual empty checkpoint that
represents the beginning of the execution, no additional checkpoint is taken.
All recoveries refer to that empty checkpoint with no state. That way batch is
strictly a special case of streaming.
This logic will allow us to make the consistency of the `SplitEnumerator`
for sources easier (future PR).
Instead of reacting to failure notifications, we react to restore
notifications.
- Restore notifications are well ordered with checkpoints, we eliminate
the races of [FLINK-20290](https://issues.apache.org/jira/browse/FLINK-20290)
- Restore notifications generalize well across local and global failovers.
- All restores are relative to checkpoints (including the virtual empty
checkpoint at the beginning). That makes split assignment tracking transparent
in the sources: When a task is restored, all splits that came after the
restored checkpoint have to be re-assigned. In batch, the restored checkpoint
is the initial empty checkpoint, so all splits every assigned to the task have
to be re-assigned.
## Brief change log
- Commit 9f760fa improves the `resetToCheckpoint()` method by passing in
the ID of the restored checkpoint.
- Commit cf30b0e adds the `subtaskReset()` method, the scheduler code to
call that method, and the tests.
## Verifying this change
This PR adds a series of test scenarios. The class
`OperatorCoordinatorSchedulerTest` covers the full contract of the
`OperatorCoordinator` calls and notifications from the scheduler.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **yes**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **not applicable**
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]