[ 
https://issues.apache.org/jira/browse/FLINK-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-5420:
----------------------------------
    Description: 
This issue targets making the operators in the CEP library re-scalable. After 
this is implemented, the user will be able to take a savepoint and restart his 
job with a different parallelism.

The way to do it is to transform the CEP operators into the newly introduced 
{{ProcessFunction}} and use only managed keyed state to store their state. With 
this transformation, rescalability will come out-of-the-box. In addition, for 
the keyed operator and for event time, we will not have to keep the already 
seen keys in a list, but we can replace them with timers set for each incoming 
element (in the {{ProcessFunction#processElement()}}) and made to fire at the 
next watermark (their timestamp will be the that of the element itself). These 
timers will be set to fire at the next watermark and when they fire, they will 
register another timer for the next watermark (in the 
{{ProcessFunction#onTimer()}} they will re-register themselves with a timestamp 
equal to {{currentWatermark() + 1}}). This will preserve the previous behavior 
of the operators.

  was:{{AbstractKeyedCEPPatternOperator}} uses the old state snapshotting 
interfaces that make the operator non-resealable. The operator uses state to 
keep track of all the in-flight keys. When a watermark arrives the buffers are 
checked for all keys and those that can be processed are processed. We could 
change this to use the abstract timer system instead.


> Make CEP operators rescalable
> -----------------------------
>
>                 Key: FLINK-5420
>                 URL: https://issues.apache.org/jira/browse/FLINK-5420
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.2.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>
> This issue targets making the operators in the CEP library re-scalable. After 
> this is implemented, the user will be able to take a savepoint and restart 
> his job with a different parallelism.
> The way to do it is to transform the CEP operators into the newly introduced 
> {{ProcessFunction}} and use only managed keyed state to store their state. 
> With this transformation, rescalability will come out-of-the-box. In 
> addition, for the keyed operator and for event time, we will not have to keep 
> the already seen keys in a list, but we can replace them with timers set for 
> each incoming element (in the {{ProcessFunction#processElement()}}) and made 
> to fire at the next watermark (their timestamp will be the that of the 
> element itself). These timers will be set to fire at the next watermark and 
> when they fire, they will register another timer for the next watermark (in 
> the {{ProcessFunction#onTimer()}} they will re-register themselves with a 
> timestamp equal to {{currentWatermark() + 1}}). This will preserve the 
> previous behavior of the operators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to