[
https://issues.apache.org/jira/browse/FLINK-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kostas Kloudas updated FLINK-5420:
----------------------------------
Description:
This issue targets making the operators in the CEP library re-scalable. After
this is implemented, the user will be able to take a savepoint and restart his
job with a different parallelism.
The way to do it is to transform the CEP operators into the newly introduced
{{ProcessFunction}} and use only managed keyed state to store their state. With
this transformation, rescalability will come out-of-the-box. In addition, for
the keyed operator and for event time, we will not have to keep the already
seen keys in a list, but we can replace them with timers set for each incoming
element (in the {{ProcessFunction#processElement()}}) and made to fire at the
next watermark (their timestamp will be the that of the element itself). These
timers will be set to fire at the next watermark and when they fire, they will
register another timer for the next watermark (in the
{{ProcessFunction#onTimer()}} they will re-register themselves with a timestamp
equal to {{currentWatermark() + 1}}). This will preserve the previous behavior
of the operators.
was:{{AbstractKeyedCEPPatternOperator}} uses the old state snapshotting
interfaces that make the operator non-resealable. The operator uses state to
keep track of all the in-flight keys. When a watermark arrives the buffers are
checked for all keys and those that can be processed are processed. We could
change this to use the abstract timer system instead.
> Make CEP operators rescalable
> -----------------------------
>
> Key: FLINK-5420
> URL: https://issues.apache.org/jira/browse/FLINK-5420
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.2.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
>
> This issue targets making the operators in the CEP library re-scalable. After
> this is implemented, the user will be able to take a savepoint and restart
> his job with a different parallelism.
> The way to do it is to transform the CEP operators into the newly introduced
> {{ProcessFunction}} and use only managed keyed state to store their state.
> With this transformation, rescalability will come out-of-the-box. In
> addition, for the keyed operator and for event time, we will not have to keep
> the already seen keys in a list, but we can replace them with timers set for
> each incoming element (in the {{ProcessFunction#processElement()}}) and made
> to fire at the next watermark (their timestamp will be the that of the
> element itself). These timers will be set to fire at the next watermark and
> when they fire, they will register another timer for the next watermark (in
> the {{ProcessFunction#onTimer()}} they will re-register themselves with a
> timestamp equal to {{currentWatermark() + 1}}). This will preserve the
> previous behavior of the operators.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)