[ https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861079#comment-15861079 ]
Aljoscha Krettek commented on BEAM-1393: ---------------------------------------- Yes, your analysis is spot on! I think what we can do is completely copy the code of {{AbstractStreamOperator}} to the Beam code, then try and come up with a good interface that works for checkpointing key-groups. And then contribute that back to Flink for the next release so that we can again remove {{AbstractStreamOperator}} from the Beam code base. What do you think? I had in mind something like: {code} interface KeyGroupCheckpointer { void checkpointKeyGroup(int keyGroupIndex, OutputStream out); } {code} Our Beam operators would implement this and our custom {{AbstractStreamOperator}} would call this hook in {{AbstractStreamOperator.snapshotState()}} while iterating over the key groups. If we find that this interface works well we can easily add it to Flink proper. We would also need a new implementation of {{StateInternals}} that can checkpoint itself to a stream and restore from a stream. We could then use this to either write to key-group checkpoints or to SPLIT_DISTRIBUTE streams for the non-keyed case. > Update Flink Runner to Flink 1.2.0 > ---------------------------------- > > Key: BEAM-1393 > URL: https://issues.apache.org/jira/browse/BEAM-1393 > Project: Beam > Issue Type: Improvement > Components: runner-flink > Reporter: Aljoscha Krettek > Assignee: Jingsong Lee > > When we update to 1.2.0 we can use the new internal Timer API that is > available to Flink operators: {{InternalTimerService}} and also use broadcast > state to store side-input data. -- This message was sent by Atlassian JIRA (v6.3.15#6346)