[ 
https://issues.apache.org/jira/browse/FLINK-19970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244090#comment-17244090
 ] 

Dawid Wysakowicz edited comment on FLINK-19970 at 12/4/20, 4:21 PM:
--------------------------------------------------------------------

No, my test case matches your scenario #1. In my test there is no key rotation. 
I am using the Event#key as a key, which I set equal to the index of the 
parallel instance of the source. I run the test with parallelism of 4 so there 
were 4 constant keys.

Number of keys or keys becoming dormant should not play any role for the root 
cause. All keys are processed independently. 

The only difference in your two scenarios is how many events are assigned to a 
key and in turn into a pattern. The bug I fixed was that long chains of events 
were blocking past timed out events. Moreover the problem occured only if in 
such a chain there was a single event that was not timed-out. Then it was 
blocking all the past events. Have a look at an example:

Imagine partial matches:
{code}
1. (t=1), (t=2), (t=3) ... (t=4)
2. (t=2), (t=3) ... (t=4) (t=5)
3. (t=3) ... (t=4)(t=5)(t=6)
{code}
Assume timeout happens after 3 units of time.
Let's advance time to t = 4, then the 1. partial match gets timed out. We could 
prune event (t=1) but it was not pruned because it was blocked by e.g. (t=2) 
event in partial match 2. After advancing to t=5 the match 2. gets timed out, 
we could prune (t=2) again it is blocked by (t=3) from match 3. etc...

In your scenario #1 with much more events and basically never timing out all 
the events, the state was never pruned. That is why in scenario #1 it is more 
severe.



was (Author: dawidwys):
No, my test case matches your scenario #1. In my test there is no key rotation. 
I am using the Event#key as a key, which I set equal to the index of the 
parallel instance of the source. I run the test with parallelism of 4 so there 
were 4 constant keys.

Number of keys or keys becoming dormant should not play any role for the root 
cause. All keys are processed independently. 

The only difference in your two scenarios is how many events are assigned to a 
key and in turn into a pattern. The bug I fixed was that long chains of events 
were blocking past timed out events. Therefore it is more probable if there are 
more events to leak state.

> State leak in CEP Operators (expired events/keys not removed from state)
> ------------------------------------------------------------------------
>
>                 Key: FLINK-19970
>                 URL: https://issues.apache.org/jira/browse/FLINK-19970
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.11.2
>         Environment: Flink 1.11.2 run using the official docker containers in 
> AWS ECS Fargate.
> 1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory
>            Reporter: Thomas Wozniakowski
>            Priority: Critical
>         Attachments: image-2020-11-04-11-35-12-126.png, screenshot-1.png, 
> screenshot-2.png, screenshot-3.png
>
>
> We have been observing instability in our production environment recently, 
> seemingly related to state backends. We ended up building a load testing 
> environment to isolate factors and have discovered that the CEP library 
> appears to have some serious problems with state expiry.
> h2. Job Topology
> Source: Kinesis (standard connector) -> keyBy() and forward to...
> CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward 
> output to...
> Sink: SQS (custom connector)
> The CEP Patterns in the test look like this:
> {code:java}
> Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
>     .times(20)
>     .subtype(ScanEvent.class)
>     .within(Duration.minutes(30));
> {code}
> h2. Taskmanager Config
> {code:java}
> taskmanager.numberOfTaskSlots: $numberOfTaskSlots
> taskmanager.data.port: 6121
> taskmanager.rpc.port: 6122
> taskmanager.exit-on-fatal-akka-error: true
> taskmanager.memory.process.size: $memoryProcessSize
> taskmanager.memory.jvm-metaspace.size: 256m
> taskmanager.memory.managed.size: 0m
> jobmanager.rpc.port: 6123
> blob.server.port: 6130
> rest.port: 8081
> web.submit.enable: true
> fs.s3a.connection.maximum: 50
> fs.s3a.threads.max: 50
> akka.framesize: 250m
> akka.watch.threshold: 14
> state.checkpoints.dir: s3://$savepointBucketName/checkpoints
> state.savepoints.dir: s3://$savepointBucketName/savepoints
> state.backend: filesystem
> state.backend.async: true
> s3.access-key: $s3AccessKey
> s3.secret-key: $s3SecretKey
> {code}
> (the substitutions are controlled by terraform).
> h2. Tests
> h4. Test 1 (No key rotation)
> 8192 actors (different keys) emitting 1 Scan Event every 10 minutes 
> indefinitely. Actors (keys) never rotate in or out.
> h4. Test 2 (Constant key rotation)
> 8192 actors that produce 2 Scan events 10 minutes apart, then retire and 
> never emit again. The setup creates new actors (keys) as soon as one finishes 
> so we always have 8192. This test basically constantly rotates the key space.
> h2. Results
> For both tests, the state size (checkpoint size) grows unbounded and linearly 
> well past the 30 minute threshold that should have caused old keys or events 
> to be discard from the state. In the chart below, the left (steep) half is 
> the 24 hours we ran Test 1, the right (shallow) half is Test 2.  My 
> understanding is that the checkpoint size should level off after ~45 minutes 
> or so then stay constant.
> !image-2020-11-04-11-35-12-126.png! 
> Could someone please assist us with this? Unless we have dramatically 
> misunderstood how the CEP library is supposed to function this seems like a 
> pretty severe bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to