[
https://issues.apache.org/jira/browse/FLINK-19970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17227329#comment-17227329
]
Thomas Wozniakowski commented on FLINK-19970:
---------------------------------------------
[~dwysakowicz] Please give me a shout if there's any more diagnostic info I can
attach to assist with debugging this. This one is blocking us quite severely so
I'm more than happy to help any way I can.
> State leak in CEP Operators (expired events/keys not removed from state)
> ------------------------------------------------------------------------
>
> Key: FLINK-19970
> URL: https://issues.apache.org/jira/browse/FLINK-19970
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.11.2
> Environment: Flink 1.11.2 run using the official docker containers in
> AWS ECS Fargate.
> 1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory
> Reporter: Thomas Wozniakowski
> Priority: Critical
> Attachments: image-2020-11-04-11-35-12-126.png
>
>
> We have been observing instability in our production environment recently,
> seemingly related to state backends. We ended up building a load testing
> environment to isolate factors and have discovered that the CEP library
> appears to have some serious problems with state expiry.
> h2. Job Topology
> Source: Kinesis (standard connector) -> keyBy() and forward to...
> CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward
> output to...
> Sink: SQS (custom connector)
> The CEP Patterns in the test look like this:
> {code:java}
> Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
> .times(20)
> .subtype(ScanEvent.class)
> .within(Duration.minutes(30));
> {code}
> h2. Taskmanager Config
> {code:java}
> taskmanager.numberOfTaskSlots: $numberOfTaskSlots
> taskmanager.data.port: 6121
> taskmanager.rpc.port: 6122
> taskmanager.exit-on-fatal-akka-error: true
> taskmanager.memory.process.size: $memoryProcessSize
> taskmanager.memory.jvm-metaspace.size: 256m
> taskmanager.memory.managed.size: 0m
> jobmanager.rpc.port: 6123
> blob.server.port: 6130
> rest.port: 8081
> web.submit.enable: true
> fs.s3a.connection.maximum: 50
> fs.s3a.threads.max: 50
> akka.framesize: 250m
> akka.watch.threshold: 14
> state.checkpoints.dir: s3://$savepointBucketName/checkpoints
> state.savepoints.dir: s3://$savepointBucketName/savepoints
> state.backend: filesystem
> state.backend.async: true
> s3.access-key: $s3AccessKey
> s3.secret-key: $s3SecretKey
> {code}
> (the substitutions are controlled by terraform).
> h2. Tests
> h4. Test 1 (No key rotation)
> 8192 actors (different keys) emitting 1 Scan Event every 10 minutes
> indefinitely. Actors (keys) never rotate in or out.
> h4. Test 2 (Constant key rotation)
> 8192 actors that produce 2 Scan events 10 minutes apart, then retire and
> never emit again. The setup creates new actors (keys) as soon as one finishes
> so we always have 8192. This test basically constantly rotates the key space.
> h2. Results
> For both tests, the state size (checkpoint size) grows unbounded and linearly
> well past the 30 minute threshold that should have caused old keys or events
> to be discard from the state. In the chart below, the left (steep) half is
> the 24 hours we ran Test 1, the right (shallow) half is Test 2. My
> understanding is that the checkpoint size should level off after ~45 minutes
> or so then stay constant.
> !image-2020-11-04-11-35-12-126.png!
> Could someone please assist us with this? Unless we have dramatically
> misunderstood how the CEP library is supposed to function this seems like a
> pretty severe bug.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)