[ 
https://issues.apache.org/jira/browse/FLINK-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-5420:
----------------------------------
    Description: 
This issue targets making the operators in the CEP library re-scalable. After 
this is implemented, the user will be able to take a savepoint and restart his 
job with a different parallelism.
This issue depends on https://issues.apache.org/jira/browse/FLINK-5845. 

The way this is done is that we introduce the {{TimeServiceHandler}} in the 
{{AbstractStreamOperator}}, which keeps the registered 
{{InternalTimerService}}s (before this was in the {{AbstractStreamOperator}}) 
and a new service called {{KeyRegistry}}. The {{KeyRegistry}} will be fault 
tolerant and rescalable and will allow to register keys and a callback which 
will be invoked for each registered key upon reception of a watermark. This can 
be seen as keeping (recurring) timers for each of the registered keys that will 
fire "at the next watermark".

After introducing this service, upon reception of a watermark, all the 
processing of the NFAs will be delegated to the callback.

  was:
This issue targets making the operators in the CEP library re-scalable. After 
this is implemented, the user will be able to take a savepoint and restart his 
job with a different parallelism.

The way to do it is to transform the CEP operators into the newly introduced 
{{ProcessFunction}} and use only managed keyed state to store their state. With 
this transformation, rescalability will come out-of-the-box. In addition, for 
the keyed operator and for event time, we will not have to keep the already 
seen keys in a list, but we can replace them with timers set for each incoming 
element (in the {{ProcessFunction#processElement()}}) and made to fire at the 
next watermark (their timestamp will be the that of the element itself). These 
timers will be set to fire at the next watermark and when they fire, they will 
register another timer for the next watermark (in the 
{{ProcessFunction#onTimer()}} they will re-register themselves with a timestamp 
equal to {{currentWatermark() + 1}}). This will preserve the previous behavior 
of the operators.


> Make CEP operators rescalable
> -----------------------------
>
>                 Key: FLINK-5420
>                 URL: https://issues.apache.org/jira/browse/FLINK-5420
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.2.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>
> This issue targets making the operators in the CEP library re-scalable. After 
> this is implemented, the user will be able to take a savepoint and restart 
> his job with a different parallelism.
> This issue depends on https://issues.apache.org/jira/browse/FLINK-5845. 
> The way this is done is that we introduce the {{TimeServiceHandler}} in the 
> {{AbstractStreamOperator}}, which keeps the registered 
> {{InternalTimerService}}s (before this was in the {{AbstractStreamOperator}}) 
> and a new service called {{KeyRegistry}}. The {{KeyRegistry}} will be fault 
> tolerant and rescalable and will allow to register keys and a callback which 
> will be invoked for each registered key upon reception of a watermark. This 
> can be seen as keeping (recurring) timers for each of the registered keys 
> that will fire "at the next watermark".
> After introducing this service, upon reception of a watermark, all the 
> processing of the NFAs will be delegated to the callback.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to