Hi Flink community,
I'd like to raise a performance concern in the CEP module regarding unbounded
growth of partialMatches when start events arrive at high frequency with sparse
match completions.
Problem
In NFA.doProcess(), every event that satisfies the start condition creates a
new intermediate ComputationState and appends it to NFAState.partialMatches.
The queue has no upper bound. For a pattern with a long within() window, these
states accumulate linearly over time.
A concrete scenario:
- A single key triggers 1,000 start events/second
- Pattern within(Time.hours(6)) — cannot be reduced due to business requirements
- Matches complete very rarely (sparse)
At steady state, partialMatches holds approximately 1,000 x 6 x 3,600 = 21.6
million ComputationState objects for a single key.
Root Cause
doProcess() iterates all entries in partialMatches for every incoming event
(NFA.java:371). This is O(N) per event, where N grows without bound under the
scenario above. The resulting complexity is O(events/s x N), which causes CPU
to spike continuously.
// NFA.java:371 — iterates ALL partialMatches on every event
for (ComputationState computationState :
nfaState.getPartialMatches()) {
computeNextStates(..., computationState, event,
...);
...
}
The start-state sentinel is always recreated after each event
(NFA.java:738-748), so there is always exactly one sentinel. The accumulation
comes entirely from intermediate ComputationState objects — one created per
matched start event.
Why existing mechanisms don't help
AfterMatchSkipStrategy: prune() is only called inside
processMatchesAccordingToSkipStrategy() (NFA.java:466), which is only reached
when potentialMatches is non-empty — i.e., only when a match completes. In a
sparse-match scenario, it never fires.
within() timeout: advanceTime() does expire intermediate states after the
window, but only bounds the peak size (21.6M at steady state) — it does not
prevent the accumulation during the window period, and 21.6M states are already
far too many.
No maxPartialMatches config: NFAState.partialMatches is an unbounded
PriorityQueue. No configuration exists to cap its size.
Question to the community
Before proposing any changes to the framework, I'd like to first ask: is there
a recommended best practice or existing pattern for handling this kind of
scenario?
Specifically, is there any built-in way in CEP to:
- Limit the number of concurrent partial matches per key, or
- Discard old partial matches when a new start event arrives, or
- Any other approach that avoids unbounded partialMatches growth under
high-frequency start events?
I've reviewed AfterMatchSkipStrategy, within(), and the CEP configuration
options, but none of them seem to address this at the source. If I've missed
something, please point me in the right direction.
If there is indeed no existing solution, I'd be happy to open a JIRA and
discuss potential approaches.
Best regards,
Sherlock-lin