tzulitai opened a new pull request #13761:
URL: https://github.com/apache/flink/pull/13761
## What is the purpose of the change
This PR provides a temporary solution for the following issue:
- On restore, the timer services **_always_** attempts to read timers from
raw keyed state streams, regardless of whether or not timers were actually
written to raw keyed state. This is done to allow users to swap from heap-based
timers to RocksDB timers across savepoint restores. Since there is no
information about the previous timer configuration in savepoints, the only way
to support the timer swapping is to always see if something was written.
- The issue is that this implementation **assumes that there are no other
writers to the raw keyed streams**.
- If a `AbstractStreamOperator` implementation uses the raw keyed streams,
on restore the timer services would assume that they wrote the checkpointed
data and tries to read them, and then the restore obviously fails (because of
read errors).
## Solution
This PR solves this by adding a flag `isUsingCustomRawKeyedState` to the
`AbstractStreamOperator`, which by default is `false`:
```
protected boolean isUsingCustomRawKeyedState() {
return false;
}
```
If a `AbstractStreamOperator` implementation writes to raw keyed streams, it
should also override this to return `true`.
On restore, the timer services would respect this flag and skip reading from
the raw keyed streams.
Note that this works due to the fact that there could only ever be one
writer to raw keyed streams.
If there are multiple writers attempting to write to the raw keyed stream
(i.e. legacy heap-based timers + some custom writing in user code),
checkpointing would have already failed consistently, since the raw keyed
stream API (`KeyedStateCheckpointOutputStream`) only allows calling
`startNewKeyGroup` once for the same stream.
## Brief change log
- 10bfda6 introduces the `isUsingCustomRawKeyedState` flag to
`AbstractStreamOperator`
- 2eab38b allows passing in the flag when creating the
`StreamOperatorStateContext`
- c8b5f04 wires in the flag to be respected by the
`StreamOperatorStateContext` instantiation, so that timer services do no read
from raw keyed state if they weren't the ones who wrote to it.
- 1ca873f Adds a test that uses an `AbstractStreamOperator` implementation
that writes to raw keyed state and verifies that snapshotting and restoring
works, and the new flag is being respected. If you alter the flag to be `false`
in the test, the test would fail.
- c415739 Add an extra notice in docs mentioning that checkpoint would fail
if you're using RocksDB + heap timers + writing to custom raw keyed state in
some operators.
## Verifying this change
The new test
`AbstractStreamOperatorTest#testCustomRawKeyedStateSnapshotAndRestore` should
cover this fix.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **NO**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (**YES** / no)
- The serializers: (yes / **NO** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **NO**
/ don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (**YES** / no /
don't know)
- The S3 file system connector: (yes / **NO** / don't know)
## Documentation
While technically this PR introduces a new feature, we deliberately do not
mention it in the docs as raw keyed state is intended as an internal feature
that users should not be using (unless in some very edge cases).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]