This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 79cccd7103a304bfa07104dcafd1f65a032c88ce Author: Matthias Pohl <[email protected]> AuthorDate: Fri Feb 2 14:47:42 2024 +0100 [FLINK-34007][k8s] Adds graceful shutdown logic to KubernetesLeaderElector We need to make sure that any ongoing leadership event is properly handled. shutdownNow and the Precondition are too aggressive. --- .../kubeclient/resources/KubernetesLeaderElector.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java index 0bf65ddbac2..728f46abcb6 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java @@ -21,7 +21,7 @@ package org.apache.flink.kubernetes.kubeclient.resources; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration; import org.apache.flink.kubernetes.utils.KubernetesUtils; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; @@ -41,6 +41,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; @@ -160,10 +161,18 @@ public class KubernetesLeaderElector { // code that handles the leader loss cancelCurrentLeaderElectionSession(); - final List<Runnable> outstandingTasks = executorService.shutdownNow(); - Preconditions.checkState( - outstandingTasks.isEmpty(), - "All tasks that handle the leadership revocation should have been executed."); + // the shutdown of the executor needs to happen gracefully for scenarios where the + // release is called in the executorService. Interrupting this logic will result in the + // leadership-lost event not being sent to the client. + final List<Runnable> outStandingTasks = + ExecutorUtils.gracefulShutdown(30, TimeUnit.SECONDS, executorService); + + if (!outStandingTasks.isEmpty()) { + LOG.warn( + "{} events were not processed before stopping the {} instance.", + outStandingTasks.size(), + KubernetesLeaderElector.class.getSimpleName()); + } } }
