This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 3523f5e [hotfix] Do not use ExecutorService.submit since it can
swallow exceptions
3523f5e is described below
commit 3523f5e4f9489389e71855860903794adb983805
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Apr 16 10:41:44 2021 +0200
[hotfix] Do not use ExecutorService.submit since it can swallow exceptions
This commit changes the KubernetesLeaderElector to use
ExecutorService.execute instead of submit
which ensures that potential exceptions are forwarded to the fatal uncaught
exeception handler.
This closes #15740.
---
.../kubeclient/resources/KubernetesLeaderElector.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
index 82a0bf9..0b4d941 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
@@ -53,6 +53,8 @@ public class KubernetesLeaderElector {
@VisibleForTesting
public static final String LEADER_ANNOTATION_KEY =
"control-plane.alpha.kubernetes.io/leader";
+ private final Object lock = new Object();
+
private final ExecutorService executorService =
Executors.newSingleThreadExecutor(
new
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
@@ -92,11 +94,20 @@ public class KubernetesLeaderElector {
}
public void run() {
- executorService.submit(internalLeaderElector::run);
+ synchronized (lock) {
+ if (executorService.isShutdown()) {
+ LOG.debug(
+ "Ignoring KubernetesLeaderElector.run call because the
leader elector has already been shut down.");
+ } else {
+ executorService.execute(internalLeaderElector::run);
+ }
+ }
}
public void stop() {
- executorService.shutdownNow();
+ synchronized (lock) {
+ executorService.shutdownNow();
+ }
}
public static boolean hasLeadership(KubernetesConfigMap configMap, String
lockIdentity) {