[ https://issues.apache.org/jira/browse/FLINK-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882679#comment-15882679 ]
ASF GitHub Bot commented on FLINK-5420: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3412#discussion_r102939631 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -887,6 +864,30 @@ public void close() { // Watermark handling // ------------------------------------------------------------------------ + public <K> void registerWatermarkCallback(KeyRegistry.OnWatermarkCallback<K> callback, TypeSerializer<K> keySerializer) { --- End diff -- The key serialiser can be retrieved from `getKeyedStateBackend().getKeySerializer()`. > Make CEP operators rescalable > ----------------------------- > > Key: FLINK-5420 > URL: https://issues.apache.org/jira/browse/FLINK-5420 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.2.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > > This issue targets making the operators in the CEP library re-scalable. After > this is implemented, the user will be able to take a savepoint and restart > his job with a different parallelism. > This issue depends on https://issues.apache.org/jira/browse/FLINK-5845. > The way this is done is that we introduce the {{TimeServiceHandler}} in the > {{AbstractStreamOperator}}, which keeps the registered > {{InternalTimerService}} s (before this was in the > {{AbstractStreamOperator}}) and a new service called {{KeyRegistry}}. The > {{KeyRegistry}} will be fault tolerant and rescalable and will allow to > register keys and a callback which will be invoked for each registered key > upon reception of a watermark. This can be seen as keeping (recurring) timers > for each of the registered keys that will fire "at the next watermark". > After introducing this service, upon reception of a watermark, all the > processing of the NFAs will be delegated to the callback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)