Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/3001
Hi @tony810430!
Sorry for the long pause on this PR. After some back and forth offline
discussions with others on how exactly we want to proceed with this, we decided
to stick with using union state to cope with the shard discovery on restore
problem (at least for 1.3.0). Therefore, we can finally continue work here :-D
First of all, to use union state, instead of `ListCheckpointed`, we should
use `CheckpointedFunction` instead. There is a PR for exposing union state to
the public API (#3508), but in case that isn't merged yet within the next few
days, I suggest that you don't need to be blocked when you continue your work
on this PR. For now, you can cast the operator state store instance retrieved
through the `FunctionInitializationContext` to `DefaultOperatorStateStore` to
use broadcast state.
One thing to also note, which is missing in you previous work on this, is
that we need a migration path from the old state access (i.e., via
`CheckpointedAsynchronously`) to the new state (i.e. `CheckpointedFunction`).
The `FlinkKafkaConsumerBase` class in the Kafka connector provides a very
good example of how to do this. Simply put, in the end, the
`FlinkKinesisConsumer` should implement both `CheckpointedRestoring` and
`CheckpointedFunction`, and bridge the old state read from the legacy
`restoreState(...)` method to the new `initializeState(...)` method. The bridge
would simply be a field variable in the consumer class.
The `FlinkKafkaConsumerBase` also serves as a good example of how to use
the `CheckpointedFunction` if you have questions there.
Let me know if you have any questions with this, and feel free to ping me
any time!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---