[ 
https://issues.apache.org/jira/browse/FLINK-19970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244007#comment-17244007
 ] 

Dawid Wysakowicz commented on FLINK-19970:
------------------------------------------

Could you please double check that you've run my branch? If you are confident 
that you run my branch, I would need a program that I can run locally that can 
help me reproduce the problem. I've tried preparing one myself, but I cannot 
reproduce the state leak with the fix from my branch (I could do it without the 
fix). You can see the program I tried below:

{code}
public class CepBug {
        public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(100);
                
env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);

                Pattern<Event, ?> pattern = Pattern.<Event>begin("start", 
AfterMatchSkipStrategy.skipPastLastEvent())
                                .times(20)
                                .within(Time.milliseconds(10));

                KeyedStream<Event, Object> events = env.addSource(new 
RichParallelSourceFunction<Event>() {
                        private volatile boolean isRunning = true;
                        private long currentTimestamp = 0;
                        private int currentId = 0;

                        @Override
                        public void run(SourceContext<Event> sourceContext) 
throws Exception {
                                while (isRunning) {
                                        sourceContext.collectWithTimestamp(
                                                        new Event(
                                                                        
currentId++,
                                                                        
getRuntimeContext().getIndexOfThisSubtask(),
                                                                        
currentTimestamp
                                                        ),
                                                        currentTimestamp++
                                        );
                                        Thread.sleep(10);
                                }
                        }

                        @Override
                        public void cancel() {
                                this.isRunning = false;
                        }
                })
                                .assignTimestampsAndWatermarks(
                                                
WatermarkStrategy.forMonotonousTimestamps()
                                )
                                .keyBy(e -> e.key);

                SingleOutputStreamOperator<String> start = CEP.pattern(events, 
pattern)
                                .process(new PatternProcessFunction<Event, 
String>() {
                                        @Override
                                        public void processMatch(
                                                        Map<String, 
List<Event>> map,
                                                        Context context,
                                                        Collector<String> 
collector) throws Exception {
                                                collector.collect(
                                                                map.get("start")
                                                                                
.stream()
                                                                                
.map(e -> String.format("%s,%s", e.id, e.timestamp))
                                                                                
.collect(Collectors.joining(";"))
                                                );
                                        }
                                });

                start.print();
                env.execute();
        }

        public static class Event {
                public final int id;
                public final int key;
                public final long timestamp;

                public Event(int id, int key, long timestamp) {
                        this.id = id;
                        this.key = key;
                        this.timestamp = timestamp;
                }
        }
}
{code}

If I am correct it follows your scenario and is actually even more aggressive. 
The checkpoint size of this program stays stable ~47kB for over 20 minutes.

> State leak in CEP Operators (expired events/keys not removed from state)
> ------------------------------------------------------------------------
>
>                 Key: FLINK-19970
>                 URL: https://issues.apache.org/jira/browse/FLINK-19970
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.11.2
>         Environment: Flink 1.11.2 run using the official docker containers in 
> AWS ECS Fargate.
> 1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory
>            Reporter: Thomas Wozniakowski
>            Priority: Critical
>         Attachments: image-2020-11-04-11-35-12-126.png, screenshot-1.png, 
> screenshot-2.png
>
>
> We have been observing instability in our production environment recently, 
> seemingly related to state backends. We ended up building a load testing 
> environment to isolate factors and have discovered that the CEP library 
> appears to have some serious problems with state expiry.
> h2. Job Topology
> Source: Kinesis (standard connector) -> keyBy() and forward to...
> CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward 
> output to...
> Sink: SQS (custom connector)
> The CEP Patterns in the test look like this:
> {code:java}
> Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
>     .times(20)
>     .subtype(ScanEvent.class)
>     .within(Duration.minutes(30));
> {code}
> h2. Taskmanager Config
> {code:java}
> taskmanager.numberOfTaskSlots: $numberOfTaskSlots
> taskmanager.data.port: 6121
> taskmanager.rpc.port: 6122
> taskmanager.exit-on-fatal-akka-error: true
> taskmanager.memory.process.size: $memoryProcessSize
> taskmanager.memory.jvm-metaspace.size: 256m
> taskmanager.memory.managed.size: 0m
> jobmanager.rpc.port: 6123
> blob.server.port: 6130
> rest.port: 8081
> web.submit.enable: true
> fs.s3a.connection.maximum: 50
> fs.s3a.threads.max: 50
> akka.framesize: 250m
> akka.watch.threshold: 14
> state.checkpoints.dir: s3://$savepointBucketName/checkpoints
> state.savepoints.dir: s3://$savepointBucketName/savepoints
> state.backend: filesystem
> state.backend.async: true
> s3.access-key: $s3AccessKey
> s3.secret-key: $s3SecretKey
> {code}
> (the substitutions are controlled by terraform).
> h2. Tests
> h4. Test 1 (No key rotation)
> 8192 actors (different keys) emitting 1 Scan Event every 10 minutes 
> indefinitely. Actors (keys) never rotate in or out.
> h4. Test 2 (Constant key rotation)
> 8192 actors that produce 2 Scan events 10 minutes apart, then retire and 
> never emit again. The setup creates new actors (keys) as soon as one finishes 
> so we always have 8192. This test basically constantly rotates the key space.
> h2. Results
> For both tests, the state size (checkpoint size) grows unbounded and linearly 
> well past the 30 minute threshold that should have caused old keys or events 
> to be discard from the state. In the chart below, the left (steep) half is 
> the 24 hours we ran Test 1, the right (shallow) half is Test 2.  My 
> understanding is that the checkpoint size should level off after ~45 minutes 
> or so then stay constant.
> !image-2020-11-04-11-35-12-126.png! 
> Could someone please assist us with this? Unless we have dramatically 
> misunderstood how the CEP library is supposed to function this seems like a 
> pretty severe bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to