tzulitai opened a new pull request #13761:
URL: https://github.com/apache/flink/pull/13761


   ## What is the purpose of the change
   
   This PR provides a temporary solution for the following issue:
   - On restore, the timer services **_always_** attempts to read timers from 
raw keyed state streams, regardless of whether or not timers were actually 
written to raw keyed state. This is done to allow users to swap from heap-based 
timers to RocksDB timers across savepoint restores. Since there is no 
information about the previous timer configuration in savepoints, the only way 
to support the timer swapping is to always see if something was written.
   - The issue is that this implementation **assumes that there are no other 
writers to the raw keyed streams**.
   - If a `AbstractStreamOperator` implementation uses the raw keyed streams, 
on restore the timer services would assume that they wrote the checkpointed 
data and tries to read them, and then the restore obviously fails (because of 
read errors).
   
   ## Solution
   
   This PR solves this by adding a flag `isUsingCustomRawKeyedState` to the 
`AbstractStreamOperator`, which by default is `false`:
   ```
   protected boolean isUsingCustomRawKeyedState() {
       return false;
   }
   ```
   
   If a `AbstractStreamOperator` implementation writes to raw keyed streams, it 
should also override this to return `true`.
   On restore, the timer services would respect this flag and skip reading from 
the raw keyed streams.
   
   Note that this works due to the fact that there could only ever be one 
writer to raw keyed streams.
   If there are multiple writers attempting to write to the raw keyed stream 
(i.e. legacy heap-based timers + some custom writing in user code), 
checkpointing would have already failed consistently, since the raw keyed 
stream API (`KeyedStateCheckpointOutputStream`) only allows calling 
`startNewKeyGroup` once for the same stream.
   
   ## Brief change log
   
   - 10bfda6 introduces the `isUsingCustomRawKeyedState` flag to 
`AbstractStreamOperator`
   - 2eab38b allows passing in the flag when creating the 
`StreamOperatorStateContext`
   - c8b5f04 wires in the flag to be respected by the 
`StreamOperatorStateContext` instantiation, so that timer services do no read 
from raw keyed state if they weren't the ones who wrote to it.
   - 1ca873f Adds a test that uses an `AbstractStreamOperator` implementation 
that writes to raw keyed state and verifies that snapshotting and restoring 
works, and the new flag is being respected. If you alter the flag to be `false` 
in the test, the test would fail.
   - c415739 Add an extra notice in docs mentioning that checkpoint would fail 
if you're using RocksDB + heap timers + writing to custom raw keyed state in 
some operators.
   
   ## Verifying this change
   
   The new test 
`AbstractStreamOperatorTest#testCustomRawKeyedStateSnapshotAndRestore` should 
cover this fix.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **NO**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**YES** / no)
     - The serializers: (yes / **NO** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **NO** 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (**YES** / no / 
don't know)
     - The S3 file system connector: (yes / **NO** / don't know)
   
   ## Documentation
   
   While technically this PR introduces a new feature, we deliberately do not 
mention it in the docs as raw keyed state is intended as an internal feature 
that users should not be using (unless in some very edge cases).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to