This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2799038b964e88129545bf4e6a5128c03e3d2f2b Author: David Moravek <[email protected]> AuthorDate: Fri May 5 12:32:55 2023 +0200 [FLINK-32010][kubernetes] Properly handle KubernetesLeaderRetrievalDriver.ConfigMapCallbackHandlerImpl#onAdded events in case the leader is already known. --- .../KubernetesLeaderRetrievalDriver.java | 11 ++- ...KubernetesLeaderElectionAndRetrievalITCase.java | 102 +++++++++++++-------- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java index f08e4622165..035057c7441 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java @@ -98,9 +98,14 @@ public class KubernetesLeaderRetrievalDriver implements LeaderRetrievalDriver { @Override public void onAdded(List<KubernetesConfigMap> configMaps) { - // The ConfigMap is created by KubernetesLeaderElectionDriver with empty data. We do not - // process this - // useless event. + // The ConfigMap is created by KubernetesLeaderElectionDriver with empty data. We don't + // really need to process anything unless the retriever was started after the leader + // election has already succeeded. + final KubernetesConfigMap configMap = getOnlyConfigMap(configMaps, configMapName); + final LeaderInformation leaderInformation = leaderInformationExtractor.apply(configMap); + if (!leaderInformation.isEmpty()) { + leaderRetrievalEventHandler.notifyLeaderAddress(leaderInformation); + } } @Override diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java index 4d922cbda06..d101d19b805 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.highavailability; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.KubernetesExtension; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions; import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration; import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher; @@ -28,15 +29,18 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.leaderelection.LeaderInformation; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionEventHandler; import org.apache.flink.runtime.leaderretrieval.TestingLeaderRetrievalEventHandler; -import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY; import static org.assertj.core.api.Assertions.assertThat; @@ -54,73 +58,99 @@ class KubernetesLeaderElectionAndRetrievalITCase { "akka.tcp://[email protected]:6123/user/rpc/dispatcher"; @RegisterExtension - private static final KubernetesExtension kubernetesExtension = new KubernetesExtension(); + private static final KubernetesExtension KUBERNETES_EXTENSION = new KubernetesExtension(); + + @RegisterExtension + private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>(Executors::newCachedThreadPool); @Test void testLeaderElectionAndRetrieval() throws Exception { - final String configMapName = LEADER_CONFIGMAP_NAME + System.currentTimeMillis(); - KubernetesLeaderElectionDriver leaderElectionDriver = null; - KubernetesLeaderRetrievalDriver leaderRetrievalDriver = null; - - final FlinkKubeClient flinkKubeClient = kubernetesExtension.getFlinkKubeClient(); - final Configuration configuration = kubernetesExtension.getConfiguration(); + final String configMapName = LEADER_CONFIGMAP_NAME + UUID.randomUUID(); + final FlinkKubeClient flinkKubeClient = KUBERNETES_EXTENSION.getFlinkKubeClient(); + final Configuration configuration = KUBERNETES_EXTENSION.getConfiguration(); final String clusterId = configuration.getString(KubernetesConfigOptions.CLUSTER_ID); + + // This will make the leader election retrieval time out if we won't process already + // existing leader information when starting it up. + configuration.set( + KubernetesHighAvailabilityOptions.KUBERNETES_LEASE_DURATION, Duration.ofHours(1)); + configuration.set( + KubernetesHighAvailabilityOptions.KUBERNETES_RETRY_PERIOD, Duration.ofHours(1)); + configuration.set( + KubernetesHighAvailabilityOptions.KUBERNETES_RENEW_DEADLINE, Duration.ofHours(1)); + + final List<AutoCloseable> closeables = new ArrayList<>(); + final KubernetesConfigMapSharedWatcher configMapSharedWatcher = flinkKubeClient.createConfigMapSharedWatcher( KubernetesUtils.getConfigMapLabels( clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)); - final ExecutorService watchExecutorService = Executors.newCachedThreadPool(); + closeables.add(configMapSharedWatcher); final TestingLeaderElectionEventHandler electionEventHandler = new TestingLeaderElectionEventHandler(LEADER_ADDRESS); + closeables.add(electionEventHandler); try { - leaderElectionDriver = + final KubernetesLeaderElectionDriver leaderElectionDriver = new KubernetesLeaderElectionDriver( flinkKubeClient, configMapSharedWatcher, - watchExecutorService, + EXECUTOR_EXTENSION.getExecutor(), new KubernetesLeaderElectionConfiguration( configMapName, UUID.randomUUID().toString(), configuration), electionEventHandler, electionEventHandler::handleError); + closeables.add(leaderElectionDriver); + electionEventHandler.init(leaderElectionDriver); - final TestingLeaderRetrievalEventHandler retrievalEventHandler = + final Function<TestingLeaderRetrievalEventHandler, AutoCloseable> + leaderRetrievalDriverFactory = + leaderRetrievalEventHandler -> + new KubernetesLeaderRetrievalDriver( + configMapSharedWatcher, + EXECUTOR_EXTENSION.getExecutor(), + configMapName, + leaderRetrievalEventHandler, + KubernetesUtils::getLeaderInformationFromConfigMap, + leaderRetrievalEventHandler::handleError); + + final TestingLeaderRetrievalEventHandler firstLeaderRetrievalEventHandler = new TestingLeaderRetrievalEventHandler(); - leaderRetrievalDriver = - new KubernetesLeaderRetrievalDriver( - configMapSharedWatcher, - watchExecutorService, - configMapName, - retrievalEventHandler, - KubernetesUtils::getLeaderInformationFromConfigMap, - retrievalEventHandler::handleError); + closeables.add(leaderRetrievalDriverFactory.apply(firstLeaderRetrievalEventHandler)); + // Wait for the driver to obtain leadership. electionEventHandler.waitForLeader(); - // Check the new leader is confirmed final LeaderInformation confirmedLeaderInformation = electionEventHandler.getConfirmedLeaderInformation(); assertThat(confirmedLeaderInformation.getLeaderAddress()).isEqualTo(LEADER_ADDRESS); - // Check the leader retrieval driver should be notified the leader address - retrievalEventHandler.waitForNewLeader(); - assertThat(retrievalEventHandler.getLeaderSessionID()) - .isEqualByComparingTo(confirmedLeaderInformation.getLeaderSessionID()); - assertThat(retrievalEventHandler.getAddress()) - .isEqualTo(confirmedLeaderInformation.getLeaderAddress()); + // Check if the leader retrieval driver is notified about the leader address + awaitLeadership(firstLeaderRetrievalEventHandler, confirmedLeaderInformation); + + // Start a second leader retrieval that should be notified immediately because we + // already know who the leader is. + final TestingLeaderRetrievalEventHandler secondRetrievalEventHandler = + new TestingLeaderRetrievalEventHandler(); + closeables.add(leaderRetrievalDriverFactory.apply(secondRetrievalEventHandler)); + awaitLeadership(secondRetrievalEventHandler, confirmedLeaderInformation); } finally { - electionEventHandler.close(); - if (leaderElectionDriver != null) { - leaderElectionDriver.close(); - } - if (leaderRetrievalDriver != null) { - leaderRetrievalDriver.close(); + for (AutoCloseable closeable : closeables) { + closeable.close(); } flinkKubeClient.deleteConfigMap(configMapName).get(); - configMapSharedWatcher.close(); - ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, watchExecutorService); } } + + private static void awaitLeadership( + TestingLeaderRetrievalEventHandler handler, LeaderInformation leaderInformation) + throws Exception { + handler.waitForNewLeader(); + assertThat(handler.getLeaderSessionID()) + .isEqualByComparingTo(leaderInformation.getLeaderSessionID()); + assertThat(handler.getAddress()).isEqualTo(leaderInformation.getLeaderAddress()); + } }
