This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79cccd7103a304bfa07104dcafd1f65a032c88ce
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Feb 2 14:47:42 2024 +0100

    [FLINK-34007][k8s] Adds graceful shutdown logic to KubernetesLeaderElector
    
    We need to make sure that any ongoing leadership event is properly handled. 
shutdownNow and the Precondition are too aggressive.
---
 .../kubeclient/resources/KubernetesLeaderElector.java | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
index 0bf65ddbac2..728f46abcb6 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
@@ -21,7 +21,7 @@ package org.apache.flink.kubernetes.kubeclient.resources;
 import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 
@@ -160,10 +161,18 @@ public class KubernetesLeaderElector {
             // code that handles the leader loss
             cancelCurrentLeaderElectionSession();
 
-            final List<Runnable> outstandingTasks = 
executorService.shutdownNow();
-            Preconditions.checkState(
-                    outstandingTasks.isEmpty(),
-                    "All tasks that handle the leadership revocation should 
have been executed.");
+            // the shutdown of the executor needs to happen gracefully for 
scenarios where the
+            // release is called in the executorService. Interrupting this 
logic will result in the
+            // leadership-lost event not being sent to the client.
+            final List<Runnable> outStandingTasks =
+                    ExecutorUtils.gracefulShutdown(30, TimeUnit.SECONDS, 
executorService);
+
+            if (!outStandingTasks.isEmpty()) {
+                LOG.warn(
+                        "{} events were not processed before stopping the {} 
instance.",
+                        outStandingTasks.size(),
+                        KubernetesLeaderElector.class.getSimpleName());
+            }
         }
     }
 

Reply via email to