Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4418
  
    Hi @dianfu and @dawidwys. This PR is actually broken. 
    
    The reason is that the `comparator` in the 
`AbstractKeyedCEPPatternOperator` is not `serializable` so when Flink tries to 
ship the job to the cluster, it will fail with a `NotSerializableException`. 
    
    If you want to see that you can modify one of the IT cases in the 
`CEPITCase`, e.g. the `CEPITCase.testSimplePatternCEP()` and add a comparator 
in the `DataStream<String> result = CEP.pattern(input, pattern);`
    
    After this is fixed, then I have some more comments on the rest of the 
implementation:
    
    1) In the `AbstractKeyedCEPPatternOperator` we do not need a new state for 
the `bufferedEvents`. We can use the already existing `elementQueueState` and 
store the timestamps and the elements in the case of processing time, as done 
in event time. This will lead to better code re-use, as we will be able to use 
code from the event-time logic. In general, it would be nice to unify in the 
future the processing- and event-time code paths by also buffering elements and 
registering timers for processing time, as done in event time. This will also 
solve the issue of having to wait for the next element to arrive, before being 
able to emit a timed out pattern in processing time. BUT for this we will need 
to somehow let the user specify a parameter like the `watermark interval` in 
event-time, so it needs more discussion.
    
    2) Typo in the `CEPOperatorTest`: `getKeyedCepOpearatorWithComparator` -> 
`getKeyedCepOperatorWithComparator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to