[
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861079#comment-15861079
]
Aljoscha Krettek commented on BEAM-1393:
----------------------------------------
Yes, your analysis is spot on!
I think what we can do is completely copy the code of
{{AbstractStreamOperator}} to the Beam code, then try and come up with a good
interface that works for checkpointing key-groups. And then contribute that
back to Flink for the next release so that we can again remove
{{AbstractStreamOperator}} from the Beam code base. What do you think?
I had in mind something like:
{code}
interface KeyGroupCheckpointer {
void checkpointKeyGroup(int keyGroupIndex, OutputStream out);
}
{code}
Our Beam operators would implement this and our custom
{{AbstractStreamOperator}} would call this hook in
{{AbstractStreamOperator.snapshotState()}} while iterating over the key groups.
If we find that this interface works well we can easily add it to Flink proper.
We would also need a new implementation of {{StateInternals}} that can
checkpoint itself to a stream and restore from a stream. We could then use this
to either write to key-group checkpoints or to SPLIT_DISTRIBUTE streams for the
non-keyed case.
> Update Flink Runner to Flink 1.2.0
> ----------------------------------
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is
> available to Flink operators: {{InternalTimerService}} and also use broadcast
> state to store side-input data.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)