This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2799038b964e88129545bf4e6a5128c03e3d2f2b
Author: David Moravek <[email protected]>
AuthorDate: Fri May 5 12:32:55 2023 +0200

    [FLINK-32010][kubernetes] Properly handle 
KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in 
case the leader is already known.
---
 .../KubernetesLeaderRetrievalDriver.java           |  11 ++-
 ...KubernetesLeaderElectionAndRetrievalITCase.java | 102 +++++++++++++--------
 2 files changed, 74 insertions(+), 39 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
index f08e4622165..035057c7441 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
@@ -98,9 +98,14 @@ public class KubernetesLeaderRetrievalDriver implements 
LeaderRetrievalDriver {
 
         @Override
         public void onAdded(List<KubernetesConfigMap> configMaps) {
-            // The ConfigMap is created by KubernetesLeaderElectionDriver with 
empty data. We do not
-            // process this
-            // useless event.
+            // The ConfigMap is created by KubernetesLeaderElectionDriver with 
empty data. We don't
+            // really need to process anything unless the retriever was 
started after the leader
+            // election has already succeeded.
+            final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, 
configMapName);
+            final LeaderInformation leaderInformation = 
leaderInformationExtractor.apply(configMap);
+            if (!leaderInformation.isEmpty()) {
+                
leaderRetrievalEventHandler.notifyLeaderAddress(leaderInformation);
+            }
         }
 
         @Override
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
index 4d922cbda06..d101d19b805 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.highavailability;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.KubernetesExtension;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
 import 
org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
 import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
@@ -28,15 +29,18 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
 import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionEventHandler;
 import 
org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler;
-import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -54,73 +58,99 @@ class KubernetesLeaderElectionAndRetrievalITCase {
             "akka.tcp://[email protected]:6123/user/rpc/dispatcher";
 
     @RegisterExtension
-    private static final KubernetesExtension kubernetesExtension = new 
KubernetesExtension();
+    private static final KubernetesExtension KUBERNETES_EXTENSION = new 
KubernetesExtension();
+
+    @RegisterExtension
+    private static final TestExecutorExtension<ExecutorService> 
EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(Executors::newCachedThreadPool);
 
     @Test
     void testLeaderElectionAndRetrieval() throws Exception {
-        final String configMapName = LEADER_CONFIGMAP_NAME + 
System.currentTimeMillis();
-        KubernetesLeaderElectionDriver leaderElectionDriver = null;
-        KubernetesLeaderRetrievalDriver leaderRetrievalDriver = null;
-
-        final FlinkKubeClient flinkKubeClient = 
kubernetesExtension.getFlinkKubeClient();
-        final Configuration configuration = 
kubernetesExtension.getConfiguration();
+        final String configMapName = LEADER_CONFIGMAP_NAME + UUID.randomUUID();
+        final FlinkKubeClient flinkKubeClient = 
KUBERNETES_EXTENSION.getFlinkKubeClient();
+        final Configuration configuration = 
KUBERNETES_EXTENSION.getConfiguration();
 
         final String clusterId = 
configuration.getString(KubernetesConfigOptions.CLUSTER_ID);
+
+        // This will make the leader election retrieval time out if we won't 
process already
+        // existing leader information when starting it up.
+        configuration.set(
+                KubernetesHighAvailabilityOptions.KUBERNETES_LEASE_DURATION, 
Duration.ofHours(1));
+        configuration.set(
+                KubernetesHighAvailabilityOptions.KUBERNETES_RETRY_PERIOD, 
Duration.ofHours(1));
+        configuration.set(
+                KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, 
Duration.ofHours(1));
+
+        final List<AutoCloseable> closeables = new ArrayList<>();
+
         final KubernetesConfigMapSharedWatcher configMapSharedWatcher =
                 flinkKubeClient.createConfigMapSharedWatcher(
                         KubernetesUtils.getConfigMapLabels(
                                 clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
-        final ExecutorService watchExecutorService = 
Executors.newCachedThreadPool();
+        closeables.add(configMapSharedWatcher);
 
         final TestingLeaderElectionEventHandler electionEventHandler =
                 new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
+        closeables.add(electionEventHandler);
 
         try {
-            leaderElectionDriver =
+            final KubernetesLeaderElectionDriver leaderElectionDriver =
                     new KubernetesLeaderElectionDriver(
                             flinkKubeClient,
                             configMapSharedWatcher,
-                            watchExecutorService,
+                            EXECUTOR_EXTENSION.getExecutor(),
                             new KubernetesLeaderElectionConfiguration(
                                     configMapName, 
UUID.randomUUID().toString(), configuration),
                             electionEventHandler,
                             electionEventHandler::handleError);
+            closeables.add(leaderElectionDriver);
+
             electionEventHandler.init(leaderElectionDriver);
 
-            final TestingLeaderRetrievalEventHandler retrievalEventHandler =
+            final Function<TestingLeaderRetrievalEventHandler, AutoCloseable>
+                    leaderRetrievalDriverFactory =
+                            leaderRetrievalEventHandler ->
+                                    new KubernetesLeaderRetrievalDriver(
+                                            configMapSharedWatcher,
+                                            EXECUTOR_EXTENSION.getExecutor(),
+                                            configMapName,
+                                            leaderRetrievalEventHandler,
+                                            
KubernetesUtils::getLeaderInformationFromConfigMap,
+                                            
leaderRetrievalEventHandler::handleError);
+
+            final TestingLeaderRetrievalEventHandler 
firstLeaderRetrievalEventHandler =
                     new TestingLeaderRetrievalEventHandler();
-            leaderRetrievalDriver =
-                    new KubernetesLeaderRetrievalDriver(
-                            configMapSharedWatcher,
-                            watchExecutorService,
-                            configMapName,
-                            retrievalEventHandler,
-                            KubernetesUtils::getLeaderInformationFromConfigMap,
-                            retrievalEventHandler::handleError);
+            
closeables.add(leaderRetrievalDriverFactory.apply(firstLeaderRetrievalEventHandler));
 
+            // Wait for the driver to obtain leadership.
             electionEventHandler.waitForLeader();
-            // Check the new leader is confirmed
             final LeaderInformation confirmedLeaderInformation =
                     electionEventHandler.getConfirmedLeaderInformation();
             
assertThat(confirmedLeaderInformation.getLeaderAddress()).isEqualTo(LEADER_ADDRESS);
 
-            // Check the leader retrieval driver should be notified the leader 
address
-            retrievalEventHandler.waitForNewLeader();
-            assertThat(retrievalEventHandler.getLeaderSessionID())
-                    
.isEqualByComparingTo(confirmedLeaderInformation.getLeaderSessionID());
-            assertThat(retrievalEventHandler.getAddress())
-                    .isEqualTo(confirmedLeaderInformation.getLeaderAddress());
+            // Check if the leader retrieval driver is notified about the 
leader address
+            awaitLeadership(firstLeaderRetrievalEventHandler, 
confirmedLeaderInformation);
+
+            // Start a second leader retrieval that should be notified 
immediately because we
+            // already know who the leader is.
+            final TestingLeaderRetrievalEventHandler 
secondRetrievalEventHandler =
+                    new TestingLeaderRetrievalEventHandler();
+            
closeables.add(leaderRetrievalDriverFactory.apply(secondRetrievalEventHandler));
+            awaitLeadership(secondRetrievalEventHandler, 
confirmedLeaderInformation);
         } finally {
-            electionEventHandler.close();
-            if (leaderElectionDriver != null) {
-                leaderElectionDriver.close();
-            }
-            if (leaderRetrievalDriver != null) {
-                leaderRetrievalDriver.close();
+            for (AutoCloseable closeable : closeables) {
+                closeable.close();
             }
             flinkKubeClient.deleteConfigMap(configMapName).get();
-            configMapSharedWatcher.close();
-            ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, 
watchExecutorService);
         }
     }
+
+    private static void awaitLeadership(
+            TestingLeaderRetrievalEventHandler handler, LeaderInformation 
leaderInformation)
+            throws Exception {
+        handler.waitForNewLeader();
+        assertThat(handler.getLeaderSessionID())
+                .isEqualByComparingTo(leaderInformation.getLeaderSessionID());
+        
assertThat(handler.getAddress()).isEqualTo(leaderInformation.getLeaderAddress());
+    }
 }

Reply via email to