Hi Flink community,

I'd like to raise a performance concern in the CEP module regarding unbounded 
growth of partialMatches when start events arrive at high frequency with sparse 
match completions.




Problem


In NFA.doProcess(), every event that satisfies the start condition creates a 
new intermediate ComputationState and appends it to NFAState.partialMatches. 
The queue has no upper bound. For a pattern with a long within() window, these 
states accumulate linearly over time.


A concrete scenario:
- A single key triggers 1,000 start events/second
- Pattern within(Time.hours(6)) — cannot be reduced due to business requirements
- Matches complete very rarely (sparse)


At steady state, partialMatches holds approximately 1,000 x 6 x 3,600 = 21.6 
million ComputationState objects for a single key.




Root Cause


doProcess() iterates all entries in partialMatches for every incoming event 
(NFA.java:371). This is O(N) per event, where N grows without bound under the 
scenario above. The resulting complexity is O(events/s x N), which causes CPU 
to spike continuously.


    // NFA.java:371 — iterates ALL partialMatches on every event
    for (ComputationState computationState : 
nfaState.getPartialMatches()) {
        computeNextStates(..., computationState, event, 
...);
        ...
    }


The start-state sentinel is always recreated after each event 
(NFA.java:738-748), so there is always exactly one sentinel. The accumulation 
comes entirely from intermediate ComputationState objects — one created per 
matched start event.




Why existing mechanisms don't help


AfterMatchSkipStrategy: prune() is only called inside 
processMatchesAccordingToSkipStrategy() (NFA.java:466), which is only reached 
when potentialMatches is non-empty — i.e., only when a match completes. In a 
sparse-match scenario, it never fires.


within() timeout: advanceTime() does expire intermediate states after the 
window, but only bounds the peak size (21.6M at steady state) — it does not 
prevent the accumulation during the window period, and 21.6M states are already 
far too many.


No maxPartialMatches config: NFAState.partialMatches is an unbounded 
PriorityQueue. No configuration exists to cap its size.




Question to the community


Before proposing any changes to the framework, I'd like to first ask: is there 
a recommended best practice or existing pattern for handling this kind of 
scenario?


Specifically, is there any built-in way in CEP to:
- Limit the number of concurrent partial matches per key, or
- Discard old partial matches when a new start event arrives, or
- Any other approach that avoids unbounded partialMatches growth under 
high-frequency start events?


I've reviewed AfterMatchSkipStrategy, within(), and the CEP configuration 
options, but none of them seem to address this at the source. If I've missed 
something, please point me in the right direction.


If there is indeed no existing solution, I'd be happy to open a JIRA and 
discuss potential approaches.


Best regards,
Sherlock-lin

Reply via email to