[
https://issues.apache.org/jira/browse/FLINK-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16114135#comment-16114135
]
ASF GitHub Bot commented on FLINK-7293:
---------------------------------------
Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/4418
Hi @dianfu and @dawidwys. This PR is actually broken.
The reason is that the `comparator` in the
`AbstractKeyedCEPPatternOperator` is not `serializable` so when Flink tries to
ship the job to the cluster, it will fail with a `NotSerializableException`.
If you want to see that you can modify one of the IT cases in the
`CEPITCase`, e.g. the `CEPITCase.testSimplePatternCEP()` and add a comparator
in the `DataStream<String> result = CEP.pattern(input, pattern);`
After this is fixed, then I have some more comments on the rest of the
implementation:
1) In the `AbstractKeyedCEPPatternOperator` we do not need a new state for
the `bufferedEvents`. We can use the already existing `elementQueueState` and
store the timestamps and the elements in the case of processing time, as done
in event time. This will lead to better code re-use, as we will be able to use
code from the event-time logic. In general, it would be nice to unify in the
future the processing- and event-time code paths by also buffering elements and
registering timers for processing time, as done in event time. This will also
solve the issue of having to wait for the next element to arrive, before being
able to emit a timed out pattern in processing time. BUT for this we will need
to somehow let the user specify a parameter like the `watermark interval` in
event-time, so it needs more discussion.
2) Typo in the `CEPOperatorTest`: `getKeyedCepOpearatorWithComparator` ->
`getKeyedCepOperatorWithComparator`
> Support custom order by in PatternStream
> ----------------------------------------
>
> Key: FLINK-7293
> URL: https://issues.apache.org/jira/browse/FLINK-7293
> Project: Flink
> Issue Type: Sub-task
> Components: CEP
> Reporter: Dian Fu
> Assignee: Dian Fu
>
> Currently, when {{ProcessingTime}} is configured, the events are fed to NFA
> in the order of the arriving time and when {{EventTime}} is configured, the
> events are fed to NFA in the order of the event time. It should also allow
> custom {{order by}} to allow users to define the order of the events besides
> the above factors.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)