Github user kl0u commented on the issue:
https://github.com/apache/flink/pull/4418
Hi @dianfu and @dawidwys. This PR is actually broken.
The reason is that the `comparator` in the
`AbstractKeyedCEPPatternOperator` is not `serializable` so when Flink tries to
ship the job to the cluster, it will fail with a `NotSerializableException`.
If you want to see that you can modify one of the IT cases in the
`CEPITCase`, e.g. the `CEPITCase.testSimplePatternCEP()` and add a comparator
in the `DataStream<String> result = CEP.pattern(input, pattern);`
After this is fixed, then I have some more comments on the rest of the
implementation:
1) In the `AbstractKeyedCEPPatternOperator` we do not need a new state for
the `bufferedEvents`. We can use the already existing `elementQueueState` and
store the timestamps and the elements in the case of processing time, as done
in event time. This will lead to better code re-use, as we will be able to use
code from the event-time logic. In general, it would be nice to unify in the
future the processing- and event-time code paths by also buffering elements and
registering timers for processing time, as done in event time. This will also
solve the issue of having to wait for the next element to arrive, before being
able to emit a timed out pattern in processing time. BUT for this we will need
to somehow let the user specify a parameter like the `watermark interval` in
event-time, so it needs more discussion.
2) Typo in the `CEPOperatorTest`: `getKeyedCepOpearatorWithComparator` ->
`getKeyedCepOperatorWithComparator`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---