[
https://issues.apache.org/jira/browse/FLINK-32701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Puneet Duggal updated FLINK-32701:
----------------------------------
Affects Version/s: 1.16.2
1.16.1
> Potential Memory Leak in Flink CEP due to Persistent Starting States in
> NFAState
> --------------------------------------------------------------------------------
>
> Key: FLINK-32701
> URL: https://issues.apache.org/jira/browse/FLINK-32701
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.17.0, 1.16.1, 1.16.2, 1.17.1
> Reporter: Puneet Duggal
> Priority: Critical
> Attachments: Screenshot 2023-07-26 at 11.45.06 AM.png, Screenshot
> 2023-07-26 at 11.50.28 AM.png
>
>
> Our team has encountered a potential memory leak issue while working with the
> Complex Event Processing (CEP) library in Flink v1.17.
> h2. Context
> The CEP Operator maintains a keyed state called NFAState, which holds two
> queues: one for partial matches and one for completed matches. When a key is
> first encountered, the CEP creates a starting computation state and stores it
> in the partial matches queue. As more events occur that match the defined
> conditions (e.g., a TAKE condition), additional computation states get added
> to the queue, with their specific type (normal, pending, end) depending on
> the pattern sequence.
> However, I have noticed that the starting computation state remains in the
> partial matches queue even after the pattern sequence has been completely
> matched. This is also the case for keys that have already timed out. As a
> result, the state gets stored for all keys that the CEP ever encounters,
> leading to a continual increase in the checkpoint size.
> h2. How to reproduce this
> # Pattern Sequence - A not_followed_by B within 5 mins
> # Time Characteristic - EventTime
> # StateBackend - FsStateBackend
> On my local machine, I started this pipeline and started sending events at
> the rate of 10 events per second (only A) and as expected after 5 mins, CEP
> started sending pattern matched output with the same rate. But the issue was
> that after every 2 mins (checkpoint interval), checkpoint size kept on
> increasing. Expectation was that after 5 mins (2-3 checkpoints), checkpoint
> size will remain constant since any window of 5 mins will consist of the same
> number of unique keys (older ones will get matched or timed out hence removed
> from state). But as you can see below attached images, checkpoint size kept
> on increasing till 40 checkpoints (around 1.5hrs).
> P.S. - After 3 checkpoints (6 mins), the checkpoint size was around 1.78MB.
> Hence assumption is that ideal checkpoint size for a 5 min window should be
> less than 1.78MB.
> As you can see after 39 checkpoints, I triggered a savepoint for this
> pipeline. After that I used a savepoint reader to investigate what all is
> getting stored in CEP states. Below code investigates NFAState of CEPOperator
> for potential memory leak.
> {code:java}
> import lombok.AllArgsConstructor;
> import lombok.Data;
> import lombok.NoArgsConstructor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.cep.nfa.NFAState;
> import org.apache.flink.cep.nfa.NFAStateSerializer;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.state.api.OperatorIdentifier;
> import org.apache.flink.state.api.SavepointReader;
> import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import org.junit.jupiter.api.Test;
> import java.io.Serializable;
> import java.util.Objects;
> public class NFAStateReaderTest {
> private static final String NFA_STATE_NAME = "nfaStateName";
> @Test
> public void testNfaStateReader() throws Exception {
> StreamExecutionEnvironment environment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> SavepointReader savepointReader =
> SavepointReader.read(environment,
> "file:///opt/flink/savepoints/savepoint-093404-9bc0a38654df", new
> FsStateBackend("file:///abc"));
> DataStream<NFAStateOutput> stream =
> savepointReader.readKeyedState(OperatorIdentifier.forUid("select_pattern_events"),
> new NFAStateReaderTest.NFAStateReaderFunction());
> stream.print();
> environment.execute();
> }
> static class NFAStateReaderFunction extends
> KeyedStateReaderFunction<DynamicTuple, NFAStateOutput> {
> private ValueState<NFAState> computationStates;
> private static Long danglingNfaCount = 0L;
> private static Long newNfaCount = 0L;
> private static Long minTimestamp = Long.MAX_VALUE;
> private static Long minKeyForCurrentNfa = Long.MAX_VALUE;
> private static Long minKeyForDanglingNfa = Long.MAX_VALUE;
> private static Long maxKeyForDanglingNfa = Long.MIN_VALUE;
> private static Long maxKeyForCurrentNfa = Long.MIN_VALUE;
> @Override
> public void open(Configuration parameters) {
> computationStates = getRuntimeContext().getState(new
> ValueStateDescriptor<>(NFA_STATE_NAME, new NFAStateSerializer()));
> }
> @Override
> public void readKey(DynamicTuple key, Context ctx,
> Collector<NFAStateOutput> out) throws Exception {
> NFAState nfaState = computationStates.value();
> if
> (Objects.requireNonNull(nfaState.getPartialMatches().peek()).getStartTimestamp()
> != -1) {
> minTimestamp = Math.min(minTimestamp,
> nfaState.getPartialMatches().peek().getStartTimestamp());
> minKeyForCurrentNfa = Math.min(minKeyForCurrentNfa,
> Long.parseLong(key.getTuple().getField(0)));
> maxKeyForCurrentNfa = Math.max(maxKeyForCurrentNfa,
> Long.parseLong(key.getTuple().getField(0)));
> newNfaCount++;
> } else {
> danglingNfaCount++;
> minKeyForDanglingNfa = Math.min(minKeyForDanglingNfa,
> Long.parseLong(key.getTuple().getField(0)));
> maxKeyForDanglingNfa = Math.max(maxKeyForDanglingNfa,
> Long.parseLong(key.getTuple().getField(0)));
> }
> NFAStateOutput nfaStateOutput =
> new NFAStateOutput(
> danglingNfaCount,
> minTimestamp,
> newNfaCount,
> minKeyForCurrentNfa,
> maxKeyForCurrentNfa,
> minKeyForDanglingNfa,
> maxKeyForDanglingNfa);
> out.collect(nfaStateOutput);
> }
> }
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> static class NFAStateOutput implements Serializable {
> private Long danglingNfaCount;
> private Long minTimestamp;
> private Long newNfaCount;
> private Long minKeyForCurrentNfa;
> private Long maxKeyForCurrentNfa;
> private Long minKeyForDanglingNfa;
> private Long maxKeyForDanglingNfa;
> }
> }
> {code}
>
> As an output it printed nfaStateOutput for each key but since all the
> attributes in nfaStateOutput are aggregates, hence finalOutput printed was
> {code:java}
> NFAStateReaderTest.NFAStateOutput(danglingNfaCount=34391,
> minTimestamp=1690359951958, newNfaCount=3000, minKeyForCurrentNfa=6244230,
> maxKeyForCurrentNfa=6247229, minKeyForDanglingNfa=629818,
> maxKeyForDanglingNfa=6244229){code}
>
> As we can see, checkpoint is storing approximately 34391 dangling states (for
> keys which have expired (matched or timed out) ) whereas there are only 3000
> active keys (for which there are partial matches which are eligible for
> further pattern sequence matching) which is expected since throughput is 10
> events per second which amounts to 3000 unique keys in 5 mins.
> h2. Questions
> Hence, I am curious about the reasoning behind this design choice,
> specifically why the starting state remains in the partial matches queue for
> all keys, even those that have either timed out or completed their matches.
> Additionally, I am wondering what the implications would be if we were to
> delete this starting state assuming that
> # it is the only state left in the partial match queue.
> # The completed match queue in nfaState is empty.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)