[ 
https://issues.apache.org/jira/browse/FLINK-32701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Puneet Duggal updated FLINK-32701:
----------------------------------
    Description: 
Our team has encountered a potential memory leak issue while working with the 
Complex Event Processing (CEP) library in Flink v1.17.
h2. Context

The CEP Operator maintains a keyed state called NFAState, which holds two 
queues: one for partial matches and one for completed matches. When a key is 
first encountered, the CEP creates a starting computation state and stores it 
in the partial matches queue. As more events occur that match the defined 
conditions (e.g., a TAKE condition), additional computation states get added to 
the queue, with their specific type (normal, pending, end) depending on the 
pattern sequence.
However, I have noticed that the starting computation state remains in the 
partial matches queue even after the pattern sequence has been completely 
matched. This is also the case for keys that have already timed out. As a 
result, the state gets stored for all keys that the CEP ever encounters, 
leading to a continual increase in the checkpoint size.
h2.  How to reproduce this
 # Pattern Sequence - A not_followed_by B within 5 mins
 # Time Characteristic - EventTime
 # StateBackend - HashMapStateBackend

On my local machine, I started this pipeline and started sending events at the 
rate of 10 events per second (only A) and as expected after 5 mins, CEP started 
sending pattern matched output with the same rate. But the issue was that after 
every 2 mins (checkpoint interval), checkpoint size kept on increasing. 
Expectation was that after 5 mins (2-3 checkpoints), checkpoint size will 
remain constant since any window of 5 mins will consist of the same number of 
unique keys (older ones will get matched or timed out hence removed from 
state). But as you can see below attached images, checkpoint size kept on 
increasing till 40 checkpoints (around 1.5hrs).
P.S. - After 3 checkpoints (6 mins), the checkpoint size was around 1.78MB. 
Hence assumption is that ideal checkpoint size for a 5 min window should be 
less than 1.78MB.

As you can see after 39 checkpoints, I triggered a savepoint for this pipeline. 
After that I used a savepoint reader to investigate what all is getting stored 
in CEP states. Below code investigates NFAState of CEPOperator for potential 
memory leak.
{code:java}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.state.api.SavepointReader;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;

import java.io.Serializable;
import java.util.Objects;

public class NFAStateReaderTest {

    private static final String NFA_STATE_NAME = "nfaStateName";

    @Test
    public void testNfaStateReader() throws Exception {
        StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
        SavepointReader savepointReader =
                SavepointReader.read(environment, 
"file:///opt/flink/savepoints/savepoint-093404-9bc0a38654df", new 
FsStateBackend("file:///abc"));
        DataStream<NFAStateOutput> stream = 
savepointReader.readKeyedState(OperatorIdentifier.forUid("select_pattern_events"),
 new NFAStateReaderTest.NFAStateReaderFunction());
        stream.print();
        environment.execute();
    }

    static class NFAStateReaderFunction extends 
KeyedStateReaderFunction<DynamicTuple, NFAStateOutput> {

        private ValueState<NFAState> computationStates;
        private static Long danglingNfaCount = 0L;
        private static Long newNfaCount = 0L;
        private static Long minTimestamp = Long.MAX_VALUE;
        private static Long minKeyForCurrentNfa = Long.MAX_VALUE;
        private static Long minKeyForDanglingNfa = Long.MAX_VALUE;
        private static Long maxKeyForDanglingNfa = Long.MIN_VALUE;
        private static Long maxKeyForCurrentNfa = Long.MIN_VALUE;

        @Override
        public void open(Configuration parameters) {
            computationStates = getRuntimeContext().getState(new 
ValueStateDescriptor<>(NFA_STATE_NAME, new NFAStateSerializer()));
        }

        @Override
        public void readKey(DynamicTuple key, Context ctx, 
Collector<NFAStateOutput> out) throws Exception {
            NFAState nfaState = computationStates.value();
            if 
(Objects.requireNonNull(nfaState.getPartialMatches().peek()).getStartTimestamp()
 != -1) {
                minTimestamp = Math.min(minTimestamp, 
nfaState.getPartialMatches().peek().getStartTimestamp());
                minKeyForCurrentNfa = Math.min(minKeyForCurrentNfa, 
Long.parseLong(key.getTuple().getField(0)));
                maxKeyForCurrentNfa = Math.max(maxKeyForCurrentNfa, 
Long.parseLong(key.getTuple().getField(0)));
                newNfaCount++;
            } else {
                danglingNfaCount++;
                minKeyForDanglingNfa = Math.min(minKeyForDanglingNfa, 
Long.parseLong(key.getTuple().getField(0)));
                maxKeyForDanglingNfa = Math.max(maxKeyForDanglingNfa, 
Long.parseLong(key.getTuple().getField(0)));
            }
            NFAStateOutput nfaStateOutput =
                    new NFAStateOutput(
                            danglingNfaCount,
                            minTimestamp,
                            newNfaCount,
                            minKeyForCurrentNfa,
                            maxKeyForCurrentNfa,
                            minKeyForDanglingNfa,
                            maxKeyForDanglingNfa);
            out.collect(nfaStateOutput);
        }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class NFAStateOutput implements Serializable {
        private Long danglingNfaCount;
        private Long minTimestamp;
        private Long newNfaCount;
        private Long minKeyForCurrentNfa;
        private Long maxKeyForCurrentNfa;
        private Long minKeyForDanglingNfa;
        private Long maxKeyForDanglingNfa;
    }
}
{code}
 
As an output it printed nfaStateOutput for each key but since all the 
attributes in nfaStateOutput are aggregates, hence finalOutput printed was
{code:java}
NFAStateReaderTest.NFAStateOutput(danglingNfaCount=34391, 
minTimestamp=1690359951958, newNfaCount=3000, minKeyForCurrentNfa=6244230, 
maxKeyForCurrentNfa=6247229, minKeyForDanglingNfa=629818, 
maxKeyForDanglingNfa=6244229){code}
 
As we can see, checkpoint is storing approximately 34391 dangling states (for 
keys which have expired (matched or timed out) ) whereas there are only 3000 
active keys (for which there are partial matches which are eligible for further 
pattern sequence matching) which is expected since throughput is 10 events per 
second which amounts to 3000 unique keys in 5 mins.
h2.  Questions

Hence, I am curious about the reasoning behind this design choice, specifically 
why the starting state remains in the partial matches queue for all keys, even 
those that have either timed out or completed their matches.
Additionally, I am wondering what the implications would be if we were to 
delete this starting state assuming that
 # it is the only state left in the partial match queue.
 # The completed match queue in nfaState is empty.

  was:
Our team has encountered a potential memory leak issue while working with the 
Complex Event Processing (CEP) library in Flink v1.17.
h2. Context

The CEP Operator maintains a keyed state called NFAState, which holds two 
queues: one for partial matches and one for completed matches. When a key is 
first encountered, the CEP creates a starting computation state and stores it 
in the partial matches queue. As more events occur that match the defined 
conditions (e.g., a TAKE condition), additional computation states get added to 
the queue, with their specific type (normal, pending, end) depending on the 
pattern sequence.
However, I have noticed that the starting computation state remains in the 
partial matches queue even after the pattern sequence has been completely 
matched. This is also the case for keys that have already timed out. As a 
result, the state gets stored for all keys that the CEP ever encounters, 
leading to a continual increase in the checkpoint size.
h2.  How to reproduce this
 # Pattern Sequence - A not_followed_by B within 5 mins
 # Time Characteristic - EventTime
 # StateBackend - FsStateBackend

On my local machine, I started this pipeline and started sending events at the 
rate of 10 events per second (only A) and as expected after 5 mins, CEP started 
sending pattern matched output with the same rate. But the issue was that after 
every 2 mins (checkpoint interval), checkpoint size kept on increasing. 
Expectation was that after 5 mins (2-3 checkpoints), checkpoint size will 
remain constant since any window of 5 mins will consist of the same number of 
unique keys (older ones will get matched or timed out hence removed from 
state). But as you can see below attached images, checkpoint size kept on 
increasing till 40 checkpoints (around 1.5hrs).
P.S. - After 3 checkpoints (6 mins), the checkpoint size was around 1.78MB. 
Hence assumption is that ideal checkpoint size for a 5 min window should be 
less than 1.78MB.

As you can see after 39 checkpoints, I triggered a savepoint for this pipeline. 
After that I used a savepoint reader to investigate what all is getting stored 
in CEP states. Below code investigates NFAState of CEPOperator for potential 
memory leak.
{code:java}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.cep.nfa.NFAState;
import org.apache.flink.cep.nfa.NFAStateSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.state.api.SavepointReader;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.jupiter.api.Test;

import java.io.Serializable;
import java.util.Objects;

public class NFAStateReaderTest {

    private static final String NFA_STATE_NAME = "nfaStateName";

    @Test
    public void testNfaStateReader() throws Exception {
        StreamExecutionEnvironment environment = 
StreamExecutionEnvironment.getExecutionEnvironment();
        SavepointReader savepointReader =
                SavepointReader.read(environment, 
"file:///opt/flink/savepoints/savepoint-093404-9bc0a38654df", new 
FsStateBackend("file:///abc"));
        DataStream<NFAStateOutput> stream = 
savepointReader.readKeyedState(OperatorIdentifier.forUid("select_pattern_events"),
 new NFAStateReaderTest.NFAStateReaderFunction());
        stream.print();
        environment.execute();
    }

    static class NFAStateReaderFunction extends 
KeyedStateReaderFunction<DynamicTuple, NFAStateOutput> {

        private ValueState<NFAState> computationStates;
        private static Long danglingNfaCount = 0L;
        private static Long newNfaCount = 0L;
        private static Long minTimestamp = Long.MAX_VALUE;
        private static Long minKeyForCurrentNfa = Long.MAX_VALUE;
        private static Long minKeyForDanglingNfa = Long.MAX_VALUE;
        private static Long maxKeyForDanglingNfa = Long.MIN_VALUE;
        private static Long maxKeyForCurrentNfa = Long.MIN_VALUE;

        @Override
        public void open(Configuration parameters) {
            computationStates = getRuntimeContext().getState(new 
ValueStateDescriptor<>(NFA_STATE_NAME, new NFAStateSerializer()));
        }

        @Override
        public void readKey(DynamicTuple key, Context ctx, 
Collector<NFAStateOutput> out) throws Exception {
            NFAState nfaState = computationStates.value();
            if 
(Objects.requireNonNull(nfaState.getPartialMatches().peek()).getStartTimestamp()
 != -1) {
                minTimestamp = Math.min(minTimestamp, 
nfaState.getPartialMatches().peek().getStartTimestamp());
                minKeyForCurrentNfa = Math.min(minKeyForCurrentNfa, 
Long.parseLong(key.getTuple().getField(0)));
                maxKeyForCurrentNfa = Math.max(maxKeyForCurrentNfa, 
Long.parseLong(key.getTuple().getField(0)));
                newNfaCount++;
            } else {
                danglingNfaCount++;
                minKeyForDanglingNfa = Math.min(minKeyForDanglingNfa, 
Long.parseLong(key.getTuple().getField(0)));
                maxKeyForDanglingNfa = Math.max(maxKeyForDanglingNfa, 
Long.parseLong(key.getTuple().getField(0)));
            }
            NFAStateOutput nfaStateOutput =
                    new NFAStateOutput(
                            danglingNfaCount,
                            minTimestamp,
                            newNfaCount,
                            minKeyForCurrentNfa,
                            maxKeyForCurrentNfa,
                            minKeyForDanglingNfa,
                            maxKeyForDanglingNfa);
            out.collect(nfaStateOutput);
        }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class NFAStateOutput implements Serializable {
        private Long danglingNfaCount;
        private Long minTimestamp;
        private Long newNfaCount;
        private Long minKeyForCurrentNfa;
        private Long maxKeyForCurrentNfa;
        private Long minKeyForDanglingNfa;
        private Long maxKeyForDanglingNfa;
    }
}
{code}
 
As an output it printed nfaStateOutput for each key but since all the 
attributes in nfaStateOutput are aggregates, hence finalOutput printed was
{code:java}
NFAStateReaderTest.NFAStateOutput(danglingNfaCount=34391, 
minTimestamp=1690359951958, newNfaCount=3000, minKeyForCurrentNfa=6244230, 
maxKeyForCurrentNfa=6247229, minKeyForDanglingNfa=629818, 
maxKeyForDanglingNfa=6244229){code}
 
As we can see, checkpoint is storing approximately 34391 dangling states (for 
keys which have expired (matched or timed out) ) whereas there are only 3000 
active keys (for which there are partial matches which are eligible for further 
pattern sequence matching) which is expected since throughput is 10 events per 
second which amounts to 3000 unique keys in 5 mins.
h2.  Questions

Hence, I am curious about the reasoning behind this design choice, specifically 
why the starting state remains in the partial matches queue for all keys, even 
those that have either timed out or completed their matches.
Additionally, I am wondering what the implications would be if we were to 
delete this starting state assuming that
 # it is the only state left in the partial match queue.
 # The completed match queue in nfaState is empty.


> Potential Memory Leak in Flink CEP due to Persistent Starting States in 
> NFAState
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32701
>                 URL: https://issues.apache.org/jira/browse/FLINK-32701
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.17.0, 1.16.1, 1.16.2, 1.17.1
>            Reporter: Puneet Duggal
>            Priority: Critical
>              Labels: CEP, cep, stale-critical
>         Attachments: Screenshot 2023-07-26 at 11.45.06 AM.png, Screenshot 
> 2023-07-26 at 11.50.28 AM.png
>
>
> Our team has encountered a potential memory leak issue while working with the 
> Complex Event Processing (CEP) library in Flink v1.17.
> h2. Context
> The CEP Operator maintains a keyed state called NFAState, which holds two 
> queues: one for partial matches and one for completed matches. When a key is 
> first encountered, the CEP creates a starting computation state and stores it 
> in the partial matches queue. As more events occur that match the defined 
> conditions (e.g., a TAKE condition), additional computation states get added 
> to the queue, with their specific type (normal, pending, end) depending on 
> the pattern sequence.
> However, I have noticed that the starting computation state remains in the 
> partial matches queue even after the pattern sequence has been completely 
> matched. This is also the case for keys that have already timed out. As a 
> result, the state gets stored for all keys that the CEP ever encounters, 
> leading to a continual increase in the checkpoint size.
> h2.  How to reproduce this
>  # Pattern Sequence - A not_followed_by B within 5 mins
>  # Time Characteristic - EventTime
>  # StateBackend - HashMapStateBackend
> On my local machine, I started this pipeline and started sending events at 
> the rate of 10 events per second (only A) and as expected after 5 mins, CEP 
> started sending pattern matched output with the same rate. But the issue was 
> that after every 2 mins (checkpoint interval), checkpoint size kept on 
> increasing. Expectation was that after 5 mins (2-3 checkpoints), checkpoint 
> size will remain constant since any window of 5 mins will consist of the same 
> number of unique keys (older ones will get matched or timed out hence removed 
> from state). But as you can see below attached images, checkpoint size kept 
> on increasing till 40 checkpoints (around 1.5hrs).
> P.S. - After 3 checkpoints (6 mins), the checkpoint size was around 1.78MB. 
> Hence assumption is that ideal checkpoint size for a 5 min window should be 
> less than 1.78MB.
> As you can see after 39 checkpoints, I triggered a savepoint for this 
> pipeline. After that I used a savepoint reader to investigate what all is 
> getting stored in CEP states. Below code investigates NFAState of CEPOperator 
> for potential memory leak.
> {code:java}
> import lombok.AllArgsConstructor;
> import lombok.Data;
> import lombok.NoArgsConstructor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.cep.nfa.NFAState;
> import org.apache.flink.cep.nfa.NFAStateSerializer;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.state.api.OperatorIdentifier;
> import org.apache.flink.state.api.SavepointReader;
> import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import org.junit.jupiter.api.Test;
> import java.io.Serializable;
> import java.util.Objects;
> public class NFAStateReaderTest {
>     private static final String NFA_STATE_NAME = "nfaStateName";
>     @Test
>     public void testNfaStateReader() throws Exception {
>         StreamExecutionEnvironment environment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         SavepointReader savepointReader =
>                 SavepointReader.read(environment, 
> "file:///opt/flink/savepoints/savepoint-093404-9bc0a38654df", new 
> FsStateBackend("file:///abc"));
>         DataStream<NFAStateOutput> stream = 
> savepointReader.readKeyedState(OperatorIdentifier.forUid("select_pattern_events"),
>  new NFAStateReaderTest.NFAStateReaderFunction());
>         stream.print();
>         environment.execute();
>     }
>     static class NFAStateReaderFunction extends 
> KeyedStateReaderFunction<DynamicTuple, NFAStateOutput> {
>         private ValueState<NFAState> computationStates;
>         private static Long danglingNfaCount = 0L;
>         private static Long newNfaCount = 0L;
>         private static Long minTimestamp = Long.MAX_VALUE;
>         private static Long minKeyForCurrentNfa = Long.MAX_VALUE;
>         private static Long minKeyForDanglingNfa = Long.MAX_VALUE;
>         private static Long maxKeyForDanglingNfa = Long.MIN_VALUE;
>         private static Long maxKeyForCurrentNfa = Long.MIN_VALUE;
>         @Override
>         public void open(Configuration parameters) {
>             computationStates = getRuntimeContext().getState(new 
> ValueStateDescriptor<>(NFA_STATE_NAME, new NFAStateSerializer()));
>         }
>         @Override
>         public void readKey(DynamicTuple key, Context ctx, 
> Collector<NFAStateOutput> out) throws Exception {
>             NFAState nfaState = computationStates.value();
>             if 
> (Objects.requireNonNull(nfaState.getPartialMatches().peek()).getStartTimestamp()
>  != -1) {
>                 minTimestamp = Math.min(minTimestamp, 
> nfaState.getPartialMatches().peek().getStartTimestamp());
>                 minKeyForCurrentNfa = Math.min(minKeyForCurrentNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>                 maxKeyForCurrentNfa = Math.max(maxKeyForCurrentNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>                 newNfaCount++;
>             } else {
>                 danglingNfaCount++;
>                 minKeyForDanglingNfa = Math.min(minKeyForDanglingNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>                 maxKeyForDanglingNfa = Math.max(maxKeyForDanglingNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>             }
>             NFAStateOutput nfaStateOutput =
>                     new NFAStateOutput(
>                             danglingNfaCount,
>                             minTimestamp,
>                             newNfaCount,
>                             minKeyForCurrentNfa,
>                             maxKeyForCurrentNfa,
>                             minKeyForDanglingNfa,
>                             maxKeyForDanglingNfa);
>             out.collect(nfaStateOutput);
>         }
>     }
>     @Data
>     @NoArgsConstructor
>     @AllArgsConstructor
>     static class NFAStateOutput implements Serializable {
>         private Long danglingNfaCount;
>         private Long minTimestamp;
>         private Long newNfaCount;
>         private Long minKeyForCurrentNfa;
>         private Long maxKeyForCurrentNfa;
>         private Long minKeyForDanglingNfa;
>         private Long maxKeyForDanglingNfa;
>     }
> }
> {code}
>  
> As an output it printed nfaStateOutput for each key but since all the 
> attributes in nfaStateOutput are aggregates, hence finalOutput printed was
> {code:java}
> NFAStateReaderTest.NFAStateOutput(danglingNfaCount=34391, 
> minTimestamp=1690359951958, newNfaCount=3000, minKeyForCurrentNfa=6244230, 
> maxKeyForCurrentNfa=6247229, minKeyForDanglingNfa=629818, 
> maxKeyForDanglingNfa=6244229){code}
>  
> As we can see, checkpoint is storing approximately 34391 dangling states (for 
> keys which have expired (matched or timed out) ) whereas there are only 3000 
> active keys (for which there are partial matches which are eligible for 
> further pattern sequence matching) which is expected since throughput is 10 
> events per second which amounts to 3000 unique keys in 5 mins.
> h2.  Questions
> Hence, I am curious about the reasoning behind this design choice, 
> specifically why the starting state remains in the partial matches queue for 
> all keys, even those that have either timed out or completed their matches.
> Additionally, I am wondering what the implications would be if we were to 
> delete this starting state assuming that
>  # it is the only state left in the partial match queue.
>  # The completed match queue in nfaState is empty.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to