[
https://issues.apache.org/jira/browse/FLINK-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kostas Kloudas updated FLINK-5420:
----------------------------------
Description:
This issue targets making the operators in the CEP library re-scalable. After
this is implemented, the user will be able to take a savepoint and restart his
job with a different parallelism.
This issue depends on https://issues.apache.org/jira/browse/FLINK-5845.
The way this is done is that we introduce the {{TimeServiceHandler}} in the
{{AbstractStreamOperator}}, which keeps the registered
{{InternalTimerService}}s (before this was in the {{AbstractStreamOperator}})
and a new service called {{KeyRegistry}}. The {{KeyRegistry}} will be fault
tolerant and rescalable and will allow to register keys and a callback which
will be invoked for each registered key upon reception of a watermark. This can
be seen as keeping (recurring) timers for each of the registered keys that will
fire "at the next watermark".
After introducing this service, upon reception of a watermark, all the
processing of the NFAs will be delegated to the callback.
was:
This issue targets making the operators in the CEP library re-scalable. After
this is implemented, the user will be able to take a savepoint and restart his
job with a different parallelism.
The way to do it is to transform the CEP operators into the newly introduced
{{ProcessFunction}} and use only managed keyed state to store their state. With
this transformation, rescalability will come out-of-the-box. In addition, for
the keyed operator and for event time, we will not have to keep the already
seen keys in a list, but we can replace them with timers set for each incoming
element (in the {{ProcessFunction#processElement()}}) and made to fire at the
next watermark (their timestamp will be the that of the element itself). These
timers will be set to fire at the next watermark and when they fire, they will
register another timer for the next watermark (in the
{{ProcessFunction#onTimer()}} they will re-register themselves with a timestamp
equal to {{currentWatermark() + 1}}). This will preserve the previous behavior
of the operators.
> Make CEP operators rescalable
> -----------------------------
>
> Key: FLINK-5420
> URL: https://issues.apache.org/jira/browse/FLINK-5420
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.2.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
>
> This issue targets making the operators in the CEP library re-scalable. After
> this is implemented, the user will be able to take a savepoint and restart
> his job with a different parallelism.
> This issue depends on https://issues.apache.org/jira/browse/FLINK-5845.
> The way this is done is that we introduce the {{TimeServiceHandler}} in the
> {{AbstractStreamOperator}}, which keeps the registered
> {{InternalTimerService}}s (before this was in the {{AbstractStreamOperator}})
> and a new service called {{KeyRegistry}}. The {{KeyRegistry}} will be fault
> tolerant and rescalable and will allow to register keys and a callback which
> will be invoked for each registered key upon reception of a watermark. This
> can be seen as keeping (recurring) timers for each of the registered keys
> that will fire "at the next watermark".
> After introducing this service, upon reception of a watermark, all the
> processing of the NFAs will be delegated to the callback.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)