[
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu reassigned FLINK-31041:
-------------------------------
Assignee: Weihua Hu (was: Danny Cranmer)
> Race condition in DefaultScheduler results in memory leak and busy loop
> -----------------------------------------------------------------------
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.15.3, 1.16.1
> Reporter: Danny Cranmer
> Assignee: Weihua Hu
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: failovers.log, flink-31041-heap-dump.png,
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}}
> (FLIP-27), there is a failure race condition that results in:
> * Memory leak of {{ExecutionVertexVersion}}
> * Busy loop constantly trying to restart job
> * Restart strategy is not respected
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}}
> trying to load a non-existent cert from the file system and throwing FNFE.
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10,
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location",
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client
> authentication is required
> .setProperty("ssl.keystore.location",
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config",
> "org.apache.kafka.common.security.scram.ScramLoginModule required
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456")
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
> however the {{DefaultScheduler}} does not. We need a debounce mechanism in
> the {{DefaultScheduler}} since it handles many
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from
> people who understand this code better than me!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)