[
https://issues.apache.org/jira/browse/FLINK-31041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-31041:
----------------------------------
Description:
h4. Context
When a job creates multiple sources that use the {{SourceCoordinator}}
(FLIP-27), there is a failure race condition that result in a "leak" of
ExecutionVertextVersion due to a "queue" of pending global failures.
This results in the Job Manager becoming unresponsive.
h4. !flink-31041-heap-dump.png!
h4. Reproduction Steps
This can be reproduced by a job that creates multiple sources that fail in the
{{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}}
trying to load a non-existent cert from the file system and throwing FNFE.
Thus, here is a simple job to reproduce (BE WARNED: running this locally will
lock up your IDE):
{code:java}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(new
RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10,
TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperty("security.protocol", "SASL_SSL")
// SSL configurations
// Configure the path of truststore (CA) provided by the server
.setProperty("ssl.truststore.location",
"/path/to/kafka.client.truststore.jks")
.setProperty("ssl.truststore.password", "test1234")
// Configure the path of keystore (private key) if client
authentication is required
.setProperty("ssl.keystore.location",
"/path/to/kafka.client.keystore.jks")
.setProperty("ssl.keystore.password", "test1234")
// SASL configurations
// Set SASL mechanism as SCRAM-SHA-256
.setProperty("sasl.mechanism", "SCRAM-SHA-256")
// Set JAAS configurations
.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required
username=\"username\" password=\"password\";")
.setBootstrapServers("http://localhost:3456")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
.mapToObj(i -> env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
Source " + i).uid("source-" + i)
.keyBy(s -> s.charAt(0))
.map(s -> s))
.collect(Collectors.toList());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
Source").uid("source")
.keyBy(s -> s.charAt(0))
.union(sources.toArray(new SingleOutputStreamOperator[] {}))
.print();
env.execute("test job"); {code}
h4. Root Cause
We can see that the {{OperatorCoordinatorHolder}} already has a [debounce
mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
however the {{DefaultScheduler}} does not. We need a debounce mechanism in the
{{DefaultScheduler}} since it handles many {{{}OperatorCoordinatorHolder{}}}.
h4. Fix
I have managed to fix this, I will open a PR, but would need feedback from
people who understand this code better than me!
was:
h4. Context
When a job creates multiple sources that use the {{SourceCoordinator}}
(FLIP-27), there is a failure race condition that results in:
* Memory leak of {{ExecutionVertexVersion}}
* Busy loop constantly trying to restart job
* Restart strategy is not respected
This results in the Job Manager becoming unresponsive.
h4. !flink-31041-heap-dump.png!
h4. Reproduction Steps
This can be reproduced by a job that creates multiple sources that fail in the
{{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}}
trying to load a non-existent cert from the file system and throwing FNFE.
Thus, here is a simple job to reproduce (BE WARNED: running this locally will
lock up your IDE):
{code:java}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRestartStrategy(new
RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10,
TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperty("security.protocol", "SASL_SSL")
// SSL configurations
// Configure the path of truststore (CA) provided by the server
.setProperty("ssl.truststore.location",
"/path/to/kafka.client.truststore.jks")
.setProperty("ssl.truststore.password", "test1234")
// Configure the path of keystore (private key) if client
authentication is required
.setProperty("ssl.keystore.location",
"/path/to/kafka.client.keystore.jks")
.setProperty("ssl.keystore.password", "test1234")
// SASL configurations
// Set SASL mechanism as SCRAM-SHA-256
.setProperty("sasl.mechanism", "SCRAM-SHA-256")
// Set JAAS configurations
.setProperty("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required
username=\"username\" password=\"password\";")
.setBootstrapServers("http://localhost:3456")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
.mapToObj(i -> env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
Source " + i).uid("source-" + i)
.keyBy(s -> s.charAt(0))
.map(s -> s))
.collect(Collectors.toList());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
Source").uid("source")
.keyBy(s -> s.charAt(0))
.union(sources.toArray(new SingleOutputStreamOperator[] {}))
.print();
env.execute("test job"); {code}
h4. Root Cause
We can see that the {{OperatorCoordinatorHolder}} already has a [debounce
mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
however the {{DefaultScheduler}} does not. We need a debounce mechanism in the
{{DefaultScheduler}} since it handles many {{{}OperatorCoordinatorHolder{}}}.
h4. Fix
I have managed to fix this, I will open a PR, but would need feedback from
people who understand this code better than me!
> Build up of pending global failures causes JM instability
> ---------------------------------------------------------
>
> Key: FLINK-31041
> URL: https://issues.apache.org/jira/browse/FLINK-31041
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.15.3, 1.16.1
> Reporter: Danny Cranmer
> Assignee: Weihua Hu
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
> Attachments: failovers.log, flink-31041-heap-dump.png,
> test-restart-strategy.log
>
>
> h4. Context
> When a job creates multiple sources that use the {{SourceCoordinator}}
> (FLIP-27), there is a failure race condition that result in a "leak" of
> ExecutionVertextVersion due to a "queue" of pending global failures.
> This results in the Job Manager becoming unresponsive.
> h4. !flink-31041-heap-dump.png!
> h4. Reproduction Steps
> This can be reproduced by a job that creates multiple sources that fail in
> the {{{}SplitEnumerator{}}}. We observed this with multiple {{KafkaSource's}}
> trying to load a non-existent cert from the file system and throwing FNFE.
> Thus, here is a simple job to reproduce (BE WARNED: running this locally will
> lock up your IDE):
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> env.setRestartStrategy(new
> RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10,
> TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
> KafkaSource<String> source = KafkaSource.<String>builder()
> .setProperty("security.protocol", "SASL_SSL")
> // SSL configurations
> // Configure the path of truststore (CA) provided by the server
> .setProperty("ssl.truststore.location",
> "/path/to/kafka.client.truststore.jks")
> .setProperty("ssl.truststore.password", "test1234")
> // Configure the path of keystore (private key) if client
> authentication is required
> .setProperty("ssl.keystore.location",
> "/path/to/kafka.client.keystore.jks")
> .setProperty("ssl.keystore.password", "test1234")
> // SASL configurations
> // Set SASL mechanism as SCRAM-SHA-256
> .setProperty("sasl.mechanism", "SCRAM-SHA-256")
> // Set JAAS configurations
> .setProperty("sasl.jaas.config",
> "org.apache.kafka.common.security.scram.ScramLoginModule required
> username=\"username\" password=\"password\";")
> .setBootstrapServers("http://localhost:3456")
> .setTopics("input-topic")
> .setGroupId("my-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build();
> List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
> .mapToObj(i -> env
> .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source " + i).uid("source-" + i)
> .keyBy(s -> s.charAt(0))
> .map(s -> s))
> .collect(Collectors.toList());
> env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka
> Source").uid("source")
> .keyBy(s -> s.charAt(0))
> .union(sources.toArray(new SingleOutputStreamOperator[] {}))
> .print();
> env.execute("test job"); {code}
> h4. Root Cause
> We can see that the {{OperatorCoordinatorHolder}} already has a [debounce
> mechanism|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java#L609],
> however the {{DefaultScheduler}} does not. We need a debounce mechanism in
> the {{DefaultScheduler}} since it handles many
> {{{}OperatorCoordinatorHolder{}}}.
> h4. Fix
> I have managed to fix this, I will open a PR, but would need feedback from
> people who understand this code better than me!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)