[
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689038#comment-17689038
]
Danny Cranmer commented on FLINK-31041:
---------------------------------------
[~zhuzh] thanks for looking in to this.
If I understand correctly, you are saying the "leak" is due to a "queue" of
pending global failures. They would eventually get debounced, but there is some
intermediate holding area.
We are running with jobmanager.execution.failover-strategy = full. In this mode
I would not expect multiple errors to trigger a global job failure. Possibly we
can debounce the failures earlier (as per my naive PR) and prevent this build
up? For additional context, the actual job that was causing the problem for us
originally, simply had 2x Kafka sources, and the same FileNotFoundException was
triggering the failover.
> Regarding the specific case(or similar ones) in the JIRA description, looks
> to me it can be resolved by setting the tolerable failure rate to a proper
> value.
Unfortunately this is not possible for us. We do not want the job to transition
to \{{FAILED}}. We require the job to retry indefinitely. The job could be
failing due to transient errors, like permission issues that can be fixed
without resubmitting the job. We use a fixed delay of 10 seconds.
> Race condition in DefaultScheduler results in memory leak and busy loop
> -----------------------------------------------------------------------
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.15.3, 1.16.1
> Reporter: Danny Cranmer
> Assignee: Danny Cranmer
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: failovers.log, flink-31041-heap-dump.png,
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}}
> (FLIP-27), there is a failure race condition that results in:
> * Memory leak of {{ExecutionVertexVersion}}
> * Busy loop constantly trying to restart job
> * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}}
> trying to load a non-existent cert from the file system and throwing FNFE.
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10,
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location",
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client
> authentication is required
> .setProperty("ssl.keystore.location",
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config",
> "org.apache.kafka.common.security.scram.ScramLoginModule required
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456")
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
> however the {{DefaultScheduler}} does not. We need a debounce mechanism in
> the {{DefaultScheduler}} since it handles many
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from
> people who understand this code better than me!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)