[
https://issues.apache.org/jira/browse/FLINK-19970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17244007#comment-17244007
]
Dawid Wysakowicz commented on FLINK-19970:
------------------------------------------
Could you please double check that you've run my branch? If you are confident
that you run my branch, I would need a program that I can run locally that can
help me reproduce the problem. I've tried preparing one myself, but I cannot
reproduce the state leak with the fix from my branch (I could do it without the
fix). You can see the program I tried below:
{code}
public class CepBug {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100);
env.getCheckpointConfig().enableExternalizedCheckpoints(RETAIN_ON_CANCELLATION);
Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
AfterMatchSkipStrategy.skipPastLastEvent())
.times(20)
.within(Time.milliseconds(10));
KeyedStream<Event, Object> events = env.addSource(new
RichParallelSourceFunction<Event>() {
private volatile boolean isRunning = true;
private long currentTimestamp = 0;
private int currentId = 0;
@Override
public void run(SourceContext<Event> sourceContext)
throws Exception {
while (isRunning) {
sourceContext.collectWithTimestamp(
new Event(
currentId++,
getRuntimeContext().getIndexOfThisSubtask(),
currentTimestamp
),
currentTimestamp++
);
Thread.sleep(10);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps()
)
.keyBy(e -> e.key);
SingleOutputStreamOperator<String> start = CEP.pattern(events,
pattern)
.process(new PatternProcessFunction<Event,
String>() {
@Override
public void processMatch(
Map<String,
List<Event>> map,
Context context,
Collector<String>
collector) throws Exception {
collector.collect(
map.get("start")
.stream()
.map(e -> String.format("%s,%s", e.id, e.timestamp))
.collect(Collectors.joining(";"))
);
}
});
start.print();
env.execute();
}
public static class Event {
public final int id;
public final int key;
public final long timestamp;
public Event(int id, int key, long timestamp) {
this.id = id;
this.key = key;
this.timestamp = timestamp;
}
}
}
{code}
If I am correct it follows your scenario and is actually even more aggressive.
The checkpoint size of this program stays stable ~47kB for over 20 minutes.
> State leak in CEP Operators (expired events/keys not removed from state)
> ------------------------------------------------------------------------
>
> Key: FLINK-19970
> URL: https://issues.apache.org/jira/browse/FLINK-19970
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.11.2
> Environment: Flink 1.11.2 run using the official docker containers in
> AWS ECS Fargate.
> 1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory
> Reporter: Thomas Wozniakowski
> Priority: Critical
> Attachments: image-2020-11-04-11-35-12-126.png, screenshot-1.png,
> screenshot-2.png
>
>
> We have been observing instability in our production environment recently,
> seemingly related to state backends. We ended up building a load testing
> environment to isolate factors and have discovered that the CEP library
> appears to have some serious problems with state expiry.
> h2. Job Topology
> Source: Kinesis (standard connector) -> keyBy() and forward to...
> CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward
> output to...
> Sink: SQS (custom connector)
> The CEP Patterns in the test look like this:
> {code:java}
> Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
> .times(20)
> .subtype(ScanEvent.class)
> .within(Duration.minutes(30));
> {code}
> h2. Taskmanager Config
> {code:java}
> taskmanager.numberOfTaskSlots: $numberOfTaskSlots
> taskmanager.data.port: 6121
> taskmanager.rpc.port: 6122
> taskmanager.exit-on-fatal-akka-error: true
> taskmanager.memory.process.size: $memoryProcessSize
> taskmanager.memory.jvm-metaspace.size: 256m
> taskmanager.memory.managed.size: 0m
> jobmanager.rpc.port: 6123
> blob.server.port: 6130
> rest.port: 8081
> web.submit.enable: true
> fs.s3a.connection.maximum: 50
> fs.s3a.threads.max: 50
> akka.framesize: 250m
> akka.watch.threshold: 14
> state.checkpoints.dir: s3://$savepointBucketName/checkpoints
> state.savepoints.dir: s3://$savepointBucketName/savepoints
> state.backend: filesystem
> state.backend.async: true
> s3.access-key: $s3AccessKey
> s3.secret-key: $s3SecretKey
> {code}
> (the substitutions are controlled by terraform).
> h2. Tests
> h4. Test 1 (No key rotation)
> 8192 actors (different keys) emitting 1 Scan Event every 10 minutes
> indefinitely. Actors (keys) never rotate in or out.
> h4. Test 2 (Constant key rotation)
> 8192 actors that produce 2 Scan events 10 minutes apart, then retire and
> never emit again. The setup creates new actors (keys) as soon as one finishes
> so we always have 8192. This test basically constantly rotates the key space.
> h2. Results
> For both tests, the state size (checkpoint size) grows unbounded and linearly
> well past the 30 minute threshold that should have caused old keys or events
> to be discard from the state. In the chart below, the left (steep) half is
> the 24 hours we ran Test 1, the right (shallow) half is Test 2. My
> understanding is that the checkpoint size should level off after ~45 minutes
> or so then stay constant.
> !image-2020-11-04-11-35-12-126.png!
> Could someone please assist us with this? Unless we have dramatically
> misunderstood how the CEP library is supposed to function this seems like a
> pretty severe bug.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)