[ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861079#comment-15861079
 ] 

Aljoscha Krettek commented on BEAM-1393:
----------------------------------------

Yes, your analysis is spot on!

I think what we can do is completely copy the code of 
{{AbstractStreamOperator}} to the Beam code, then try and come up with a good 
interface that works for checkpointing key-groups. And then contribute that 
back to Flink for the next release so that we can again remove 
{{AbstractStreamOperator}} from the Beam code base. What do you think?

I had in mind something like:

{code}
interface KeyGroupCheckpointer {
  void checkpointKeyGroup(int keyGroupIndex, OutputStream out);
}
{code}

Our Beam operators would implement this and our custom 
{{AbstractStreamOperator}} would call this hook in 
{{AbstractStreamOperator.snapshotState()}} while iterating over the key groups.

If we find that this interface works well we can easily add it to Flink proper.

We would also need a new implementation of {{StateInternals}} that can 
checkpoint itself to a stream and restore from a stream. We could then use this 
to either write to key-group checkpoints or to SPLIT_DISTRIBUTE streams for the 
non-keyed case.

> Update Flink Runner to Flink 1.2.0
> ----------------------------------
>
>                 Key: BEAM-1393
>                 URL: https://issues.apache.org/jira/browse/BEAM-1393
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to