[
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689015#comment-17689015
]
Zhu Zhu commented on FLINK-31041:
---------------------------------
Thanks for reporting this issue! [~dannycranmer]
I have tried to re-produce the problem locally, but not able yet to fully
re-produce it.
Here is what I see by running the example in the JIRA description:
1. The restart strategy works. by changing the strategy to
{{RestartStrategies.fixedDelayRestart(100, Time.of(10, TimeUnit.SECONDS)}}, the
job failed after encountering 100 failures.(see attached
test-restart-strategy.log) I guess the reason it is not working as expected in
the above example is that it tolerates 10000 failures in 10 seconds.
2. The debounce mechanism of DefaultScheduler works. The execution vertex
versioning is introduced for this purpose.
I tested it by reducing the source number to 2(see attached failovers.log) for
easier diagnostics. I can see 2 global failures happened every 10 seconds, and
only one job restarts in triggered by them.
Regarding the "leak of ExecutionVertexVersion", I'm not sure but guess it is
caused by that there are many pending global failures in JM's main thread. It's
not leaking, although not ideal. Yet this can be avoided if failures are not
triggered too frequently.
Regarding the specific case(or similar ones) in the JIRA description, looks to
me it can be resolved by setting the tolerable failure rate to a proper value.
WDYT?
> Race condition in DefaultScheduler results in memory leak and busy loop
> -----------------------------------------------------------------------
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.15.3, 1.16.1
> Reporter: Danny Cranmer
> Assignee: Danny Cranmer
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: flink-31041-heap-dump.png
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}}
> (FLIP-27), there is a failure race condition that results in:
> * Memory leak of {{ExecutionVertexVersion}}
> * Busy loop constantly trying to restart job
> * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}}
> trying to load a non-existent cert from the file system and throwing FNFE.
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10,
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location",
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client
> authentication is required
> .setProperty("ssl.keystore.location",
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config",
> "org.apache.kafka.common.security.scram.ScramLoginModule required
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456")
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
> however the {{DefaultScheduler}} does not. We need a debounce mechanism in
> the {{DefaultScheduler}} since it handles many
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from
> people who understand this code better than me!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)