[
https://issues.apache.org/jira/browse/FLINK-19970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Thomas Wozniakowski updated FLINK-19970:
----------------------------------------
Description:
We have been observing instability in our production environment recently,
seemingly related to state backends. We ended up building a load testing
environment to isolate factors and have discovered that the CEP library appears
to have some serious problems with state expiry.
h2. Job Topology
Source: Kinesis (standard connector) -> keyBy() and forward to...
CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward
output to...
Sink: SQS (custom connector)
The CEP Patterns in the test look like this:
{code:java}
Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
.times(20)
.subtype(ScanEvent.class)
.within(Duration.minutes(30));
{code}
h2. Taskmanager Config
{code:java}
taskmanager.numberOfTaskSlots: $numberOfTaskSlots
taskmanager.data.port: 6121
taskmanager.rpc.port: 6122
taskmanager.exit-on-fatal-akka-error: true
taskmanager.memory.process.size: $memoryProcessSize
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.managed.size: 0m
jobmanager.rpc.port: 6123
blob.server.port: 6130
rest.port: 8081
web.submit.enable: true
fs.s3a.connection.maximum: 50
fs.s3a.threads.max: 50
akka.framesize: 250m
akka.watch.threshold: 14
state.checkpoints.dir: s3://$savepointBucketName/checkpoints
state.savepoints.dir: s3://$savepointBucketName/savepoints
state.backend: filesystem
state.backend.async: true
s3.access-key: $s3AccessKey
s3.secret-key: $s3SecretKey
{code}
(the substitutions are controlled by terraform).
h2. Tests
h4. Test 1 (No key rotation)
8192 actors (different keys) emitting 1 Scan Event every 10 minutes
indefinitely. Actors (keys) never rotate in or out.
h4. Test 2 (Constant key rotation)
8192 actors that produce 2 Scan events 10 minutes apart, then retire and never
emit again. The setup creates new actors (keys) as soon as one finishes so we
always have 8192. This test basically constantly rotates the key space.
h2. Results
For both tests, the state size (checkpoint size) grows unbounded and linearly
well past the 30 minute threshold that should have caused old keys or events to
be discard from the state. In the chart below, the left (steep) half is the 24
hours we ran Test 1, the right (shallow) half is Test 2. My understanding is
that the checkpoint size should level off after ~45 minutes or so then stay
constant.
!image-2020-11-04-11-35-12-126.png!
Could someone please assist us with this? Unless we have dramatically
misunderstood how the CEP library is supposed to function this seems like a
pretty severe bug.
was:
We have been observing instability in our production environment recently,
seemingly related to state backends. We ended up building a load testing
environment to isolate factors and have discovered that the CEP library appears
to have some serious problems with state expiry.
h2. Job Topology
Source: Kinesis (standard connector) -> keyBy() and forward to...
CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward
output to...
Sink: SQS (custom connector)
The CEP Patterns in the test look like this:
{code:java}
Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
.times(20)
.subtype(ScanEvent.class)
.within(Duration.minutes(30));
{code}
h2. Taskmanager Config
{code:java}
taskmanager.numberOfTaskSlots: $numberOfTaskSlots
taskmanager.data.port: 6121
taskmanager.rpc.port: 6122
taskmanager.exit-on-fatal-akka-error: true
taskmanager.memory.process.size: $memoryProcessSize
taskmanager.memory.jvm-metaspace.size: 256m
taskmanager.memory.managed.size: 0m
jobmanager.rpc.port: 6123
blob.server.port: 6130
rest.port: 8081
web.submit.enable: true
fs.s3a.connection.maximum: 50
fs.s3a.threads.max: 50
akka.framesize: 250m
akka.watch.threshold: 14
state.checkpoints.dir: s3://$savepointBucketName/checkpoints
state.savepoints.dir: s3://$savepointBucketName/savepoints
state.backend: filesystem
state.backend.async: true
s3.access-key: $s3AccessKey
s3.secret-key: $s3SecretKey
{code}
(the substitutions are controlled by terraform).
h2. Tests
h4. Test 1 (No key rotation)
8192 actors (different keys) emitting 1 Scan Event every 10 minutes
indefinitely. Actors (keys) never rotate in or out.
h4. Test 2 (Constant key rotation)
8192 actors that produce 2 Scan events 10 minutes apart, then retire and never
emit again. The setup creates new actors (keys) as soon as one finishes so we
always have 8192. This test basically constantly rotates the key space.
h2. Results
For both tests, the state size (checkpoint size) grows unbounded and linearly
well past the 30 minute threshold that should have caused old keys or events to
be discard from the state. In the chart below, the left (steep) half is the 24
hours we ran Test 1, the right (shallow) half is Test 2.
!image-2020-11-04-11-35-12-126.png!
Could someone please assist us with this? Unless we have dramatically
misunderstood how the CEP library is supposed to function this seems like a
pretty severe bug.
> State leak in CEP Operators (expired events/keys not removed from state)
> ------------------------------------------------------------------------
>
> Key: FLINK-19970
> URL: https://issues.apache.org/jira/browse/FLINK-19970
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.11.2
> Environment: Flink 1.11.2 run using the official docker containers in
> AWS ECS Fargate.
> 1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory
> Reporter: Thomas Wozniakowski
> Priority: Critical
> Attachments: image-2020-11-04-11-35-12-126.png
>
>
> We have been observing instability in our production environment recently,
> seemingly related to state backends. We ended up building a load testing
> environment to isolate factors and have discovered that the CEP library
> appears to have some serious problems with state expiry.
> h2. Job Topology
> Source: Kinesis (standard connector) -> keyBy() and forward to...
> CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward
> output to...
> Sink: SQS (custom connector)
> The CEP Patterns in the test look like this:
> {code:java}
> Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
> .times(20)
> .subtype(ScanEvent.class)
> .within(Duration.minutes(30));
> {code}
> h2. Taskmanager Config
> {code:java}
> taskmanager.numberOfTaskSlots: $numberOfTaskSlots
> taskmanager.data.port: 6121
> taskmanager.rpc.port: 6122
> taskmanager.exit-on-fatal-akka-error: true
> taskmanager.memory.process.size: $memoryProcessSize
> taskmanager.memory.jvm-metaspace.size: 256m
> taskmanager.memory.managed.size: 0m
> jobmanager.rpc.port: 6123
> blob.server.port: 6130
> rest.port: 8081
> web.submit.enable: true
> fs.s3a.connection.maximum: 50
> fs.s3a.threads.max: 50
> akka.framesize: 250m
> akka.watch.threshold: 14
> state.checkpoints.dir: s3://$savepointBucketName/checkpoints
> state.savepoints.dir: s3://$savepointBucketName/savepoints
> state.backend: filesystem
> state.backend.async: true
> s3.access-key: $s3AccessKey
> s3.secret-key: $s3SecretKey
> {code}
> (the substitutions are controlled by terraform).
> h2. Tests
> h4. Test 1 (No key rotation)
> 8192 actors (different keys) emitting 1 Scan Event every 10 minutes
> indefinitely. Actors (keys) never rotate in or out.
> h4. Test 2 (Constant key rotation)
> 8192 actors that produce 2 Scan events 10 minutes apart, then retire and
> never emit again. The setup creates new actors (keys) as soon as one finishes
> so we always have 8192. This test basically constantly rotates the key space.
> h2. Results
> For both tests, the state size (checkpoint size) grows unbounded and linearly
> well past the 30 minute threshold that should have caused old keys or events
> to be discard from the state. In the chart below, the left (steep) half is
> the 24 hours we ran Test 1, the right (shallow) half is Test 2. My
> understanding is that the checkpoint size should level off after ~45 minutes
> or so then stay constant.
> !image-2020-11-04-11-35-12-126.png!
> Could someone please assist us with this? Unless we have dramatically
> misunderstood how the CEP library is supposed to function this seems like a
> pretty severe bug.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)