This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87560b7cedd6c857612a24b83485f5000b9edbd6
Author: Matthias Pohl <[email protected]>
AuthorDate: Tue Jan 23 16:27:02 2024 +0100

    [FLINK-34333][k8s] fabric8io LeaderElector is created with every new #run() 
call (cherry-picked from FLINK-34007)
    
    v5.12.4 allowed us to reuse the LeaderElector. With v6.6.2 
(fabric8io/kubernetes-client#4125) this behavior changed. One LeaderElector can 
only be used until the leadership is lost.
    An ITCase is added to cover the scenario where the leadership is lost.
---
 .../kubeclient/FlinkKubeClientFactory.java         |  35 ++++---
 .../resources/KubernetesLeaderElector.java         |  77 ++++++++++++---
 .../resources/KubernetesLeaderElectorITCase.java   | 108 +++++++++++++++++++++
 3 files changed, 193 insertions(+), 27 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
index 99a9ffc9a33..c16508f8b99 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.kubernetes.kubeclient;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.util.FileUtils;
@@ -46,16 +47,8 @@ public class FlinkKubeClientFactory {
         return INSTANCE;
     }
 
-    /**
-     * Create a Flink Kubernetes client with the given configuration.
-     *
-     * @param flinkConfig Flink configuration
-     * @param useCase Flink Kubernetes client use case (e.g. client, 
resourcemanager,
-     *     kubernetes-ha-services)
-     * @return Return the Flink Kubernetes client with the specified 
configuration and dedicated IO
-     *     executor.
-     */
-    public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String 
useCase) {
+    @VisibleForTesting
+    public NamespacedKubernetesClient 
createFabric8ioKubernetesClient(Configuration flinkConfig) {
         final Config config;
 
         final String kubeContext = 
flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
@@ -95,11 +88,23 @@ public class FlinkKubeClientFactory {
         config.setUserAgent(userAgent);
         LOG.debug("Setting Kubernetes client namespace: {}, userAgent: {}", 
namespace, userAgent);
 
-        final NamespacedKubernetesClient client =
-                new KubernetesClientBuilder()
-                        .withConfig(config)
-                        .build()
-                        .adapt(NamespacedKubernetesClient.class);
+        return new KubernetesClientBuilder()
+                .withConfig(config)
+                .build()
+                .adapt(NamespacedKubernetesClient.class);
+    }
+
+    /**
+     * Create a Flink Kubernetes client with the given configuration.
+     *
+     * @param flinkConfig Flink configuration
+     * @param useCase Flink Kubernetes client use case (e.g. client, 
resourcemanager,
+     *     kubernetes-ha-services)
+     * @return Return the Flink Kubernetes client with the specified 
configuration and dedicated IO
+     *     executor.
+     */
+    public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String 
useCase) {
+        final NamespacedKubernetesClient client = 
createFabric8ioKubernetesClient(flinkConfig);
         final int poolSize =
                 
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
         return new Fabric8FlinkKubeClient(
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
index d072e15cf18..dd0accf4369 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.kubeclient.resources;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
@@ -31,8 +33,13 @@ import 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigM
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Represent {@link KubernetesLeaderElector} in kubernetes. {@link 
LeaderElector#run()} is a
@@ -55,17 +62,32 @@ public class KubernetesLeaderElector {
 
     private final Object lock = new Object();
 
-    private final ExecutorService executorService =
-            Executors.newFixedThreadPool(
-                    3, new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService"));
+    private final NamespacedKubernetesClient kubernetesClient;
+    private final LeaderElectionConfig leaderElectionConfig;
+    private final ExecutorService executorService;
 
-    private final LeaderElector internalLeaderElector;
+    private CompletableFuture<?> currentLeaderElectionSession = 
FutureUtils.completedVoidFuture();
 
     public KubernetesLeaderElector(
             NamespacedKubernetesClient kubernetesClient,
             KubernetesLeaderElectionConfiguration leaderConfig,
             LeaderCallbackHandler leaderCallbackHandler) {
-        final LeaderElectionConfig leaderElectionConfig =
+        this(
+                kubernetesClient,
+                leaderConfig,
+                leaderCallbackHandler,
+                Executors.newSingleThreadExecutor(
+                        new 
ExecutorThreadFactory("KubernetesLeaderElector-ExecutorService")));
+    }
+
+    @VisibleForTesting
+    public KubernetesLeaderElector(
+            NamespacedKubernetesClient kubernetesClient,
+            KubernetesLeaderElectionConfiguration leaderConfig,
+            LeaderCallbackHandler leaderCallbackHandler,
+            ExecutorService executorService) {
+        this.kubernetesClient = kubernetesClient;
+        this.leaderElectionConfig =
                 new LeaderElectionConfigBuilder()
                         .withName(leaderConfig.getConfigMapName())
                         .withLeaseDuration(leaderConfig.getLeaseDuration())
@@ -76,6 +98,7 @@ public class KubernetesLeaderElector {
                                         leaderConfig.getLockIdentity()))
                         .withRenewDeadline(leaderConfig.getRenewDeadline())
                         .withRetryPeriod(leaderConfig.getRetryPeriod())
+                        .withReleaseOnCancel(true)
                         .withLeaderCallbacks(
                                 new LeaderCallbacks(
                                         leaderCallbackHandler::isLeader,
@@ -86,12 +109,27 @@ public class KubernetesLeaderElector {
                                                         newLeader,
                                                         
leaderConfig.getConfigMapName())))
                         .build();
-        internalLeaderElector =
-                new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService);
+        this.executorService = executorService;
+
         LOG.info(
-                "Create KubernetesLeaderElector {} with lock identity {}.",
-                leaderConfig.getConfigMapName(),
-                leaderConfig.getLockIdentity());
+                "Create KubernetesLeaderElector on lock {}.",
+                leaderElectionConfig.getLock().describe());
+    }
+
+    @GuardedBy("lock")
+    private void resetInternalLeaderElector() {
+        cancelCurrentLeaderElectionSession();
+
+        currentLeaderElectionSession =
+                new LeaderElector(kubernetesClient, leaderElectionConfig, 
executorService).start();
+
+        LOG.info(
+                "Triggered leader election on lock {}.", 
leaderElectionConfig.getLock().describe());
+    }
+
+    @GuardedBy("lock")
+    private void cancelCurrentLeaderElectionSession() {
+        currentLeaderElectionSession.cancel(true);
     }
 
     public void run() {
@@ -100,14 +138,29 @@ public class KubernetesLeaderElector {
                 LOG.debug(
                         "Ignoring KubernetesLeaderElector.run call because the 
leader elector has already been shut down.");
             } else {
-                executorService.execute(internalLeaderElector::run);
+                resetInternalLeaderElector();
             }
         }
     }
 
     public void stop() {
         synchronized (lock) {
-            executorService.shutdownNow();
+            // cancelling the current session needs to happen explicitly to 
allow the execution of
+            // code that handles the leader loss
+            cancelCurrentLeaderElectionSession();
+
+            // the shutdown of the executor needs to happen gracefully for 
scenarios where the
+            // release is called in the executorService. Interrupting this 
logic will result in the
+            // leadership-lost event not being sent to the client.
+            final List<Runnable> outStandingTasks =
+                    ExecutorUtils.gracefulShutdown(30, TimeUnit.SECONDS, 
executorService);
+
+            if (!outStandingTasks.isEmpty()) {
+                LOG.warn(
+                        "{} events were not processed before stopping the {} 
instance.",
+                        outStandingTasks.size(),
+                        KubernetesLeaderElector.class.getSimpleName());
+            }
         }
     }
 
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
index a86008f693a..e167d058765 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElectorITCase.java
@@ -19,18 +19,23 @@
 package org.apache.flink.kubernetes.kubeclient.resources;
 
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
 import org.apache.flink.kubernetes.KubernetesExtension;
+import 
org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
 import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
 
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
 import java.util.UUID;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -124,4 +129,107 @@ class KubernetesLeaderElectorITCase {
             }
         }
     }
+
+    /**
+     * This test verifies that the {@link KubernetesLeaderElector} is able to 
handle scenario where
+     * the lease cannot be renewed.
+     *
+     * <p>See FLINK-34007 for further details.
+     */
+    @Test
+    void testLeaderElectorLifecycleManagement() throws Exception {
+        final Configuration configuration = 
kubernetesExtension.getConfiguration();
+
+        try (final NamespacedKubernetesClient client =
+                
kubeClientFactory.createFabric8ioKubernetesClient(configuration)) {
+
+            // set a low timeout that makes the client stop renewing the 
leadership lease
+            final Duration renewTimeout = Duration.ofMillis(100);
+            configuration.set(
+                    
KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, renewTimeout);
+
+            final String lockIdentity = UUID.randomUUID().toString();
+            final KubernetesLeaderElectionConfiguration leaderConfig =
+                    new KubernetesLeaderElectionConfiguration(
+                            configMapName, lockIdentity, configuration);
+            final TestingLeaderCallbackHandler leadershipCallbackHandler =
+                    new TestingLeaderCallbackHandler(lockIdentity);
+
+            final ManuallyTriggeredScheduledExecutorService executorService =
+                    new ManuallyTriggeredScheduledExecutorService();
+            final KubernetesLeaderElector testInstance =
+                    new KubernetesLeaderElector(
+                            client, leaderConfig, leadershipCallbackHandler, 
executorService);
+
+            // first leadership lifecycle initiation
+            testInstance.run();
+
+            // triggers acquiring the leadership
+            final Duration waitForNextTaskForever = Duration.ofDays(10);
+            executorService.trigger(waitForNextTaskForever);
+
+            assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync())
+                    .as("The leadership should be acquired eventually.")
+                    .eventuallySucceeds();
+
+            // halt thread to reach the renew deadline
+            Thread.sleep(renewTimeout.plusSeconds(1).toMillis());
+
+            // triggers renew loop within fabric8io's LeaderElector
+            executorService.trigger();
+
+            
assertThatFuture(leadershipCallbackHandler.waitForRevokeLeaderAsync())
+                    .as(
+                            "The leadership should be lost eventually due to 
the renewal loop being stopped.")
+                    .eventuallySucceeds();
+
+            // revoking the leadership initiates another leadership lifecycle
+            testInstance.run();
+            executorService.trigger(waitForNextTaskForever);
+
+            assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync())
+                    .as("The leadership should be acquired eventually again.");
+        }
+    }
+
+    @Test
+    void testKubernetesLeaderElectorSendingLeadershipLostSignalOnStop() {
+        final Configuration configuration = 
kubernetesExtension.getConfiguration();
+
+        try (final NamespacedKubernetesClient client =
+                
kubeClientFactory.createFabric8ioKubernetesClient(configuration)) {
+
+            final String lockIdentity = UUID.randomUUID().toString();
+            final KubernetesLeaderElectionConfiguration leaderConfig =
+                    new KubernetesLeaderElectionConfiguration(
+                            configMapName, lockIdentity, configuration);
+            final TestingLeaderCallbackHandler leadershipCallbackHandler =
+                    new TestingLeaderCallbackHandler(lockIdentity);
+
+            final ManuallyTriggeredScheduledExecutorService executorService =
+                    new ManuallyTriggeredScheduledExecutorService();
+            final KubernetesLeaderElector testInstance =
+                    new KubernetesLeaderElector(
+                            client, leaderConfig, leadershipCallbackHandler, 
executorService);
+
+            // initiate leadership lifecycle
+            testInstance.run();
+
+            final Duration waitForNextTaskForever = Duration.ofDays(10);
+            executorService.trigger(waitForNextTaskForever);
+            assertThatFuture(leadershipCallbackHandler.waitForNewLeaderAsync())
+                    .as("Leadership should be acquired eventually.")
+                    .eventuallySucceeds();
+
+            testInstance.stop();
+
+            assertThat(leadershipCallbackHandler.hasLeadership())
+                    .as("Leadership should be lost right away after stopping 
the test instance.")
+                    .isFalse();
+
+            
assertThatFuture(leadershipCallbackHandler.waitForRevokeLeaderAsync())
+                    .as("There should be a leadership lost event being 
received eventually.")
+                    .eventuallySucceeds();
+        }
+    }
 }

Reply via email to