This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fb31e280 [hotfix] Do not use ExecutorService.submit since it can 
swallow exceptions
fb31e280 is described below

commit fb31e28002bc830e853358c3eb72b531e4c33095
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Apr 16 10:41:44 2021 +0200

    [hotfix] Do not use ExecutorService.submit since it can swallow exceptions
    
    This commit changes the KubernetesLeaderElector to use 
ExecutorService.execute instead of submit
    which ensures that potential exceptions are forwarded to the fatal uncaught 
exeception handler.
    
    This closes #15740.
---
 .../kubeclient/resources/KubernetesLeaderElector.java     | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
index 82a0bf9..0b4d941 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
@@ -53,6 +53,8 @@ public class KubernetesLeaderElector {
     @VisibleForTesting
     public static final String LEADER_ANNOTATION_KEY = 
"control-plane.alpha.kubernetes.io/leader";
 
+    private final Object lock = new Object();
+
     private final ExecutorService executorService =
             Executors.newSingleThreadExecutor(
                     new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
@@ -92,11 +94,20 @@ public class KubernetesLeaderElector {
     }
 
     public void run() {
-        executorService.submit(internalLeaderElector::run);
+        synchronized (lock) {
+            if (executorService.isShutdown()) {
+                LOG.debug(
+                        "Ignoring KubernetesLeaderElector.run call because the 
leader elector has already been shut down.");
+            } else {
+                executorService.execute(internalLeaderElector::run);
+            }
+        }
     }
 
     public void stop() {
-        executorService.shutdownNow();
+        synchronized (lock) {
+            executorService.shutdownNow();
+        }
     }
 
     public static boolean hasLeadership(KubernetesConfigMap configMap, String 
lockIdentity) {

Reply via email to