This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4576e4384ff36623712043564039f654c3b44a30 Author: Matthias Pohl <[email protected]> AuthorDate: Fri Feb 3 16:49:51 2023 +0100 [FLINK-31773][runtime] Splits up DefaultLeaderElectionService.start(LeaderContender) This change adds a clear separation of the driver lifecycle and the LeaderContender registration to DefaultLeaderElectionService. Signed-off-by: Matthias Pohl <[email protected]> --- .../highavailability/AbstractHaServices.java | 32 +- .../highavailability/HighAvailabilityServices.java | 8 +- .../DefaultLeaderElectionService.java | 386 ++++++++++++++------- .../leaderelection/LeaderElectionService.java | 9 +- .../leaderelection/LeaderElectionUtils.java | 45 +++ .../ResourceManagerServiceImpl.java | 6 +- .../apache/flink/runtime/util/ZooKeeperUtils.java | 12 +- .../JobMasterServiceLeadershipRunnerTest.java | 13 +- .../DefaultLeaderElectionServiceTest.java | 239 ++++++++++++- .../runtime/leaderelection/LeaderElectionTest.java | 2 +- .../TestingGenericLeaderContender.java | 5 +- .../runtime/leaderelection/TestingLeaderBase.java | 22 ++ .../TestingLeaderElectionDriver.java | 23 +- ...KeeperLeaderElectionConnectionHandlingTest.java | 2 + .../ZooKeeperLeaderElectionTest.java | 5 + .../ZooKeeperLeaderRetrievalTest.java | 30 +- 16 files changed, 660 insertions(+), 179 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index feb8c2f0e87..2fd1e271c53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.AutoCloseableRegistry; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -68,6 +69,8 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { private final JobResultStore jobResultStore; + private final AutoCloseableRegistry closeableRegistry = new AutoCloseableRegistry(); + protected AbstractHaServices( Configuration config, Executor ioExecutor, @@ -107,22 +110,22 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { } @Override - public LeaderElectionService getResourceManagerLeaderElectionService() { + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { return createLeaderElectionService(getLeaderPathForResourceManager()); } @Override - public LeaderElectionService getDispatcherLeaderElectionService() { + public LeaderElectionService getDispatcherLeaderElectionService() throws Exception { return createLeaderElectionService(getLeaderPathForDispatcher()); } @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { return createLeaderElectionService(getLeaderPathForJobManager(jobID)); } @Override - public LeaderElectionService getClusterRestEndpointLeaderElectionService() { + public LeaderElectionService getClusterRestEndpointLeaderElectionService() throws Exception { return createLeaderElectionService(getLeaderPathForRestServer()); } @@ -156,6 +159,12 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { exception = t; } + try { + closeableRegistry.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { internalClose(); } catch (Throwable t) { @@ -183,6 +192,12 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { exception = t; } + try { + closeableRegistry.close(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { internalClose(); } catch (Throwable t) { @@ -225,8 +240,13 @@ public abstract class AbstractHaServices implements HighAvailabilityServices { executor); } - private LeaderElectionService createLeaderElectionService(String leaderName) { - return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName)); + private LeaderElectionService createLeaderElectionService(String leaderName) throws Exception { + final DefaultLeaderElectionService leaderElectionService = + new DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName)); + leaderElectionService.startLeaderElectionBackend(); + + closeableRegistry.registerCloseable(leaderElectionService); + return leaderElectionService; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 3e83c2b6f42..4d491315a43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -125,14 +125,14 @@ public interface HighAvailabilityServices * * @return Leader election service for the resource manager leader election */ - LeaderElectionService getResourceManagerLeaderElectionService(); + LeaderElectionService getResourceManagerLeaderElectionService() throws Exception; /** * Gets the leader election service for the cluster's dispatcher. * * @return Leader election service for the dispatcher leader election */ - LeaderElectionService getDispatcherLeaderElectionService(); + LeaderElectionService getDispatcherLeaderElectionService() throws Exception; /** * Gets the leader election service for the given job. @@ -140,7 +140,7 @@ public interface HighAvailabilityServices * @param jobID The identifier of the job running the election. * @return Leader election service for the job manager leader election */ - LeaderElectionService getJobManagerLeaderElectionService(JobID jobID); + LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception; /** * Gets the leader election service for the cluster's rest endpoint. @@ -194,7 +194,7 @@ public interface HighAvailabilityServices * * @return the leader election service used by the cluster's rest endpoint */ - default LeaderElectionService getClusterRestEndpointLeaderElectionService() { + default LeaderElectionService getClusterRestEndpointLeaderElectionService() throws Exception { // for backwards compatibility we delegate to getWebMonitorLeaderElectionService // all implementations of this interface should override // getClusterRestEndpointLeaderElectionService, though diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index f75896a6967..b124127022e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; @@ -32,11 +31,11 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -44,9 +43,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Default implementation for leader election service. Composed with different {@link * LeaderElectionDriver}, we could perform a leader election for the contender, and then persist the * leader information to various storage. + * + * <p>{@code DefaultLeaderElectionService} handles a single {@link LeaderContender}. */ public class DefaultLeaderElectionService - implements LeaderElectionService, LeaderElectionEventHandler { + implements LeaderElectionService, LeaderElectionEventHandler, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderElectionService.class); @@ -54,26 +55,49 @@ public class DefaultLeaderElectionService private final LeaderElectionDriverFactory leaderElectionDriverFactory; - /** The leader contender which applies for leadership. */ + /** + * {@code leaderContender} being {@code null} indicates that no {@link LeaderContender} is + * registered that participates in the leader election, yet. See {@link #start(LeaderContender)} + * and {@link #stop()} for lifecycle management. + * + * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class + * in a supporting IDE. + */ @GuardedBy("lock") - // @Nullable is commented-out to avoid having multiple warnings spread over this class - // this.running=true ensures that leaderContender != null - private volatile LeaderContender leaderContender; + private LeaderContender leaderContender; + /** + * Saves the session ID which was issued by the {@link LeaderElectionDriver} if and only if the + * leadership is acquired by this service. {@code issuedLeaderSessionID} being {@code null} + * indicates that this service isn't the leader right now (i.e. {@link + * #onGrantLeadership(UUID)}) wasn't called, yet (independently of what {@code + * leaderElectionDriver#hasLeadership()} returns). + */ @GuardedBy("lock") @Nullable - private volatile UUID issuedLeaderSessionID; + private UUID issuedLeaderSessionID; + /** + * Saves the leader information for a registered {@link LeaderContender} after this contender + * confirmed the leadership. + */ @GuardedBy("lock") - private volatile LeaderInformation confirmedLeaderInformation; + private LeaderInformation confirmedLeaderInformation; + /** + * {@code leaderElectionDriver} being {@code null} indicates that the connection to the + * LeaderElection backend isn't established, yet. See {@link #startLeaderElectionBackend()} and + * {@link #close()} for lifecycle management. The lifecycle of the driver should have been + * established before registering a {@link LeaderContender} and stopped after the contender has + * been removed. + * + * <p>{@code @Nullable} isn't used here to avoid having multiple warnings spread over this class + * in a supporting IDE. + */ @GuardedBy("lock") - private volatile boolean running; - - // @Nullable is commented-out to avoid having multiple warnings spread over this class - // this.running=true ensures that leaderContender != null private LeaderElectionDriver leaderElectionDriver; + @GuardedBy("lock") private final ExecutorService leadershipOperationExecutor; public DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDriverFactory) { @@ -98,23 +122,52 @@ public class DefaultLeaderElectionService this.confirmedLeaderInformation = LeaderInformation.empty(); - this.running = false; + this.leadershipOperationExecutor = Preconditions.checkNotNull(leadershipOperationExecutor); + } + + /** + * Starts the leader election process. This method has to be called before registering a {@link + * LeaderContender}. This method could be moved into the {@code DefaultLeaderElectionService}'s + * constructor with FLINK-31837. + */ + public void startLeaderElectionBackend() throws Exception { + synchronized (lock) { + Preconditions.checkState( + leaderContender == null, + "No LeaderContender should have been registered, yet."); + + leaderElectionDriver = + leaderElectionDriverFactory.createLeaderElectionDriver( + this, new LeaderElectionFatalErrorHandler()); - this.leadershipOperationExecutor = leadershipOperationExecutor; + LOG.info("Instantiating DefaultLeaderElectionService with {}.", leaderElectionDriver); + } } @Override public final void start(LeaderContender contender) throws Exception { checkNotNull(contender, "Contender must not be null."); - Preconditions.checkState(leaderContender == null, "Contender was already set."); synchronized (lock) { - running = true; + Preconditions.checkState( + leaderContender == null, + "Only one LeaderContender is allowed to be registered to this service."); + Preconditions.checkState( + leaderElectionDriver != null, + "The DefaultLeaderElectionService should have established a connection to the backend before it's started."); + leaderContender = contender; - leaderElectionDriver = - leaderElectionDriverFactory.createLeaderElectionDriver( - this, new LeaderElectionFatalErrorHandler()); - LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver); + + LOG.info( + "LeaderContender {} has been registered for {}.", + contender.getDescription(), + leaderElectionDriver); + + if (issuedLeaderSessionID != null) { + // notifying the LeaderContender shouldn't happen in the contender's main thread + runInLeaderEventThread( + () -> notifyLeaderContenderOfLeadership(issuedLeaderSessionID)); + } } } @@ -123,27 +176,64 @@ public class DefaultLeaderElectionService LOG.info("Stopping DefaultLeaderElectionService."); synchronized (lock) { - if (!running) { + if (leaderContender == null) { LOG.debug( "The stop procedure was called on an already stopped DefaultLeaderElectionService instance. No action necessary."); return; } - running = false; - if (leaderElectionDriver.hasLeadership()) { - handleLeadershipLoss(); - leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty()); + if (issuedLeaderSessionID != null) { + notifyLeaderContenderOfLeadershipLoss(); + LOG.debug( + "DefaultLeaderElectionService is stopping while having the leadership acquired. The revoke event is forwarded to the LeaderContender."); + + if (leaderElectionDriver.hasLeadership()) { + leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty()); + LOG.debug("Leader information is cleaned up while stopping."); + } } else { + Preconditions.checkState( + confirmedLeaderInformation.isEmpty(), + "The confirmed leader information should have been cleared."); + LOG.debug( "DefaultLeaderElectionService is stopping while not having the leadership acquired. No cleanup necessary."); } - } - leaderElectionDriver.close(); + leaderContender = null; + } + } - // graceful shutdown needs to happen outside the lock to enable any outstanding - // grant/revoke events to be processed without the lock being acquired by the service - ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, leadershipOperationExecutor); + @Override + public void close() throws Exception { + synchronized (lock) { + Preconditions.checkState( + leaderContender == null, + "The DefaultLeaderElectionService should have been stopped before closing the instance."); + + issuedLeaderSessionID = null; + + if (leaderElectionDriver != null) { + leaderElectionDriver.close(); + leaderElectionDriver = null; + + // The shutdown of the thread pool needs to be done forcefully because we want its + // lifecycle being coupled to the driver (which require it to be shut down within + // the lock) to allow null checks in runInLeaderEventThread method. The outstanding + // event handling callbacks are going to be ignored, anyway. + final List<Runnable> outstandingEventHandlingCalls = + Preconditions.checkNotNull(leadershipOperationExecutor).shutdownNow(); + if (!outstandingEventHandlingCalls.isEmpty()) { + LOG.debug( + "The DefaultLeaderElectionService was closed with {} still not being processed. No further action necessary.", + outstandingEventHandlingCalls.size() == 1 + ? "one event" + : (outstandingEventHandlingCalls.size() + " events")); + } + } else { + LOG.debug("The HA backend connection isn't established. No actions taken."); + } + } } @Override @@ -154,15 +244,14 @@ public class DefaultLeaderElectionService synchronized (lock) { if (hasLeadership(leaderSessionID)) { - if (running) { - confirmLeaderInformation(leaderSessionID, leaderAddress); - } else { - LOG.debug( - "Ignoring the leader session Id {} confirmation, since the LeaderElectionService has already been stopped.", - leaderSessionID); - } + Preconditions.checkState( + confirmedLeaderInformation.isEmpty(), + "No confirmation should have happened, yet."); + + confirmedLeaderInformation = + LeaderInformation.known(leaderSessionID, leaderAddress); + leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation); } else { - // Received an old confirmation call if (!leaderSessionID.equals(this.issuedLeaderSessionID)) { LOG.debug( "Receive an old confirmation call of leader session ID {}, current issued session ID is {}", @@ -171,7 +260,7 @@ public class DefaultLeaderElectionService } else { LOG.warn( "The leader session ID {} was confirmed even though the " - + "corresponding JobManager was not elected as the leader.", + + "corresponding service was not elected as the leader or has been stopped already.", leaderSessionID); } } @@ -181,31 +270,29 @@ public class DefaultLeaderElectionService @Override public boolean hasLeadership(@Nonnull UUID leaderSessionId) { synchronized (lock) { - if (running) { - return leaderElectionDriver.hasLeadership() - && leaderSessionId.equals(issuedLeaderSessionID); + if (leaderElectionDriver != null) { + if (leaderContender != null) { + return leaderElectionDriver.hasLeadership() + && leaderSessionId.equals(issuedLeaderSessionID); + } else { + LOG.debug( + "hasLeadership is called after the service is stopped, returning false."); + return false; + } } else { - LOG.debug("hasLeadership is called after the service is stopped, returning false."); + LOG.debug("hasLeadership is called after the service is closed, returning false."); return false; } } } - /** - * Returns the current leader session ID or null, if the contender is not the leader. - * - * @return The last leader session ID or null, if the contender is not the leader - */ + /** Returns the current leader session ID or {@code null}, if the session wasn't confirmed. */ @VisibleForTesting @Nullable public UUID getLeaderSessionID() { - return confirmedLeaderInformation.getLeaderSessionID(); - } - - @GuardedBy("lock") - private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) { - confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, leaderAddress); - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation); + synchronized (lock) { + return confirmedLeaderInformation.getLeaderSessionID(); + } } @Override @@ -213,24 +300,45 @@ public class DefaultLeaderElectionService runInLeaderEventThread(() -> onGrantLeadershipInternal(newLeaderSessionId)); } + @GuardedBy("lock") private void onGrantLeadershipInternal(UUID newLeaderSessionId) { - synchronized (lock) { - if (running) { - issuedLeaderSessionID = newLeaderSessionId; - confirmedLeaderInformation = LeaderInformation.empty(); + Preconditions.checkNotNull(newLeaderSessionId); - LOG.debug( - "Grant leadership to contender {} with session ID {}.", - leaderContender.getDescription(), - issuedLeaderSessionID); + Preconditions.checkState( + issuedLeaderSessionID == null, + "The leadership should have been granted while not having the leadership acquired."); - leaderContender.grantLeadership(issuedLeaderSessionID); - } else { - LOG.debug( - "Ignoring the grant leadership notification since the {} has already been closed.", - leaderElectionDriver); - } + issuedLeaderSessionID = newLeaderSessionId; + + notifyLeaderContenderOfLeadership(issuedLeaderSessionID); + } + + @GuardedBy("lock") + private void notifyLeaderContenderOfLeadership(UUID sessionID) { + if (leaderContender == null) { + LOG.debug( + "The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.", + sessionID, + leaderElectionDriver); + return; + } else if (!sessionID.equals(issuedLeaderSessionID)) { + LOG.debug( + "An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.", + sessionID, + issuedLeaderSessionID); + return; } + + Preconditions.checkState( + confirmedLeaderInformation.isEmpty(), + "The leadership should have been granted while not having the leadership acquired."); + + LOG.debug( + "Granting leadership to contender {} with session ID {}.", + leaderContender.getDescription(), + issuedLeaderSessionID); + + leaderContender.grantLeadership(issuedLeaderSessionID); } @Override @@ -238,30 +346,42 @@ public class DefaultLeaderElectionService runInLeaderEventThread(this::onRevokeLeadershipInternal); } + @GuardedBy("lock") private void onRevokeLeadershipInternal() { - synchronized (lock) { - if (running) { - handleLeadershipLoss(); - } else { - LOG.debug( - "Ignoring the revoke leadership notification since the {} " - + "has already been closed.", - leaderElectionDriver); - } + // TODO: FLINK-31814 covers adding this Precondition + // Preconditions.checkState(issuedLeaderSessionID != null,"The leadership should have + // been revoked while having the leadership acquired."); + + if (leaderContender != null) { + notifyLeaderContenderOfLeadershipLoss(); + } else { + LOG.debug( + "The revoke leadership for session {} notification is not forwarded because the DefaultLeaderElectionService({}) has no contender registered.", + issuedLeaderSessionID, + leaderElectionDriver); } + + issuedLeaderSessionID = null; } @GuardedBy("lock") - private void handleLeadershipLoss() { - LOG.debug( - "Revoke leadership of {} ({}@{}).", - leaderContender.getDescription(), - confirmedLeaderInformation.getLeaderSessionID(), - confirmedLeaderInformation.getLeaderAddress()); + private void notifyLeaderContenderOfLeadershipLoss() { + Preconditions.checkState( + leaderContender != null, + "The LeaderContender should be always set when calling this method."); + + if (confirmedLeaderInformation.isEmpty()) { + LOG.debug( + "Revoking leadership to contender {} while a previous leadership grant wasn't confirmed, yet.", + leaderContender.getDescription()); + } else { + LOG.debug( + "Revoking leadership to contender {} for {}.", + leaderContender.getDescription(), + LeaderElectionUtils.convertToString(confirmedLeaderInformation)); + } - issuedLeaderSessionID = null; confirmedLeaderInformation = LeaderInformation.empty(); - leaderContender.revokeLeadership(); } @@ -270,50 +390,67 @@ public class DefaultLeaderElectionService runInLeaderEventThread(() -> onLeaderInformationChangeInternal(leaderInformation)); } + @GuardedBy("lock") private void onLeaderInformationChangeInternal(LeaderInformation leaderInformation) { - synchronized (lock) { - if (running) { - LOG.trace( - "Leader node changed while {} is the leader with session ID {}. New leader information {}.", - leaderContender.getDescription(), - confirmedLeaderInformation.getLeaderSessionID(), - leaderInformation); - if (!confirmedLeaderInformation.isEmpty()) { - final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation; - if (leaderInformation.isEmpty()) { - LOG.debug( - "Writing leader information by {} since the external storage is empty.", - leaderContender.getDescription()); - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); - } else if (!leaderInformation.equals(confirmedLeaderInfo)) { - // the data field does not correspond to the expected leader information - LOG.debug( - "Correcting leader information by {}.", - leaderContender.getDescription()); - leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); - } + if (leaderContender != null) { + LOG.trace( + "Leader node changed while {} is the leader with {}. New leader information {}.", + leaderContender.getDescription(), + LeaderElectionUtils.convertToString(confirmedLeaderInformation), + LeaderElectionUtils.convertToString(leaderInformation)); + if (!confirmedLeaderInformation.isEmpty()) { + final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation; + if (leaderInformation.isEmpty()) { + LOG.debug( + "Writing leader information by {} since the external storage is empty.", + leaderContender.getDescription()); + leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); + } else if (!leaderInformation.equals(confirmedLeaderInfo)) { + // the data field does not correspond to the expected leader information + LOG.debug( + "Correcting leader information by {}.", + leaderContender.getDescription()); + leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo); } - } else { - LOG.debug( - "Ignoring change notification since the {} has already been closed.", - leaderElectionDriver); } + } else { + LOG.debug( + "Ignoring change notification since the {} has already been stopped.", + leaderElectionDriver); } } private void runInLeaderEventThread(Runnable callback) { - if (running) { - FutureUtils.handleUncaughtException( - CompletableFuture.runAsync(callback, leadershipOperationExecutor), - (thread, error) -> forwardErrorToLeaderContender(error)); + synchronized (lock) { + if (!leadershipOperationExecutor.isShutdown()) { + FutureUtils.handleUncaughtException( + CompletableFuture.runAsync( + () -> { + synchronized (lock) { + callback.run(); + } + }, + leadershipOperationExecutor), + (thread, error) -> forwardErrorToLeaderContender(error)); + } else { + LOG.debug( + "Leader event handling was triggered after the DefaultLeaderElectionService is closed. The event will be ignored."); + } } } private void forwardErrorToLeaderContender(Throwable t) { - if (t instanceof LeaderElectionException) { - leaderContender.handleError((LeaderElectionException) t); - } else { - leaderContender.handleError(new LeaderElectionException(t)); + synchronized (lock) { + if (leaderContender == null) { + LOG.debug("Ignoring error notification since there's no contender registered."); + return; + } + + if (t instanceof LeaderElectionException) { + leaderContender.handleError((LeaderElectionException) t); + } else { + leaderContender.handleError(new LeaderElectionException(t)); + } } } @@ -321,14 +458,7 @@ public class DefaultLeaderElectionService @Override public void onFatalError(Throwable throwable) { - synchronized (lock) { - if (!running) { - LOG.debug("Ignoring error notification since the service has been stopped."); - return; - } - - forwardErrorToLeaderContender(throwable); - } + forwardErrorToLeaderContender(throwable); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java index a0c1c5968ed..97f1ad1966a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java @@ -37,16 +37,17 @@ import java.util.UUID; public interface LeaderElectionService { /** - * Starts the leader election service. This method can only be called once. + * Registers the passed {@link LeaderContender} with this {@code LeaderElectionService}. This + * method can only be called once. * * @param contender LeaderContender which applies for the leadership - * @throws Exception */ void start(LeaderContender contender) throws Exception; /** - * Stops the leader election service. Stopping the {@code LeaderElectionService} will trigger - * {@link LeaderContender#revokeLeadership()} if the service still holds the leadership. + * Ends the participation of the registered {@link LeaderContender} in the leader election + * process. This will trigger {@link LeaderContender#revokeLeadership()} if the service still + * holds the leadership. * * @throws Exception if an error occurs while stopping the {@code LeaderElectionService}. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java new file mode 100644 index 00000000000..688a95c4155 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.Preconditions; + +import java.util.UUID; + +/** {@code LeaderElectionUtils} collects helper methods to handle LeaderElection-related issues. */ +public class LeaderElectionUtils { + + /** + * Converts the passed {@link LeaderInformation} into a human-readable representation that can + * be used in log messages. + */ + public static String convertToString(LeaderInformation leaderInformation) { + return leaderInformation.isEmpty() + ? "<no leader>" + : convertToString( + leaderInformation.getLeaderSessionID(), + leaderInformation.getLeaderAddress()); + } + + public static String convertToString(UUID sessionId, String address) { + return String.format( + "%s@%s", + Preconditions.checkNotNull(sessionId), Preconditions.checkNotNull(address)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java index e50b1b9c669..f3f75bd31b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.security.token.DelegationTokenManager; -import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.FlinkException; import org.apache.flink.util.concurrent.FutureUtils; @@ -82,7 +81,8 @@ public class ResourceManagerServiceImpl implements ResourceManagerService, Leade private ResourceManagerServiceImpl( ResourceManagerFactory<?> resourceManagerFactory, - ResourceManagerProcessContext rmProcessContext) { + ResourceManagerProcessContext rmProcessContext) + throws Exception { this.resourceManagerFactory = checkNotNull(resourceManagerFactory); this.rmProcessContext = checkNotNull(rmProcessContext); @@ -354,7 +354,7 @@ public class ResourceManagerServiceImpl implements ResourceManagerService, Leade MetricRegistry metricRegistry, String hostname, Executor ioExecutor) - throws ConfigurationException { + throws Exception { return new ResourceManagerServiceImpl( resourceManagerFactory, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index 26fc1a83384..d1416f8356b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -403,8 +403,8 @@ public class ZooKeeperUtils { * @param client The {@link CuratorFramework} ZooKeeper client to use * @return {@link DefaultLeaderElectionService} instance. */ - public static DefaultLeaderElectionService createLeaderElectionService( - CuratorFramework client) { + public static DefaultLeaderElectionService createLeaderElectionService(CuratorFramework client) + throws Exception { return createLeaderElectionService(client, ""); } @@ -418,8 +418,12 @@ public class ZooKeeperUtils { * @return {@link DefaultLeaderElectionService} instance. */ public static DefaultLeaderElectionService createLeaderElectionService( - final CuratorFramework client, final String path) { - return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path)); + final CuratorFramework client, final String path) throws Exception { + final DefaultLeaderElectionService leaderElectionService = + new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path)); + leaderElectionService.startLeaderElectionBackend(); + + return leaderElectionService; } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index 6e4d9cb0fa6..6519817877c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -680,8 +680,9 @@ class JobMasterServiceLeadershipRunnerTest { // we need to use DefaultLeaderElectionService here because JobMasterServiceLeadershipRunner // in connection with the DefaultLeaderElectionService generates the nested locking - final LeaderElectionService defaultLeaderElectionService = + final DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService(testingLeaderElectionDriverFactory); + defaultLeaderElectionService.startLeaderElectionBackend(); // latch to detect when we reached the first synchronized section having a lock on the // JobMasterServiceProcess#stop side @@ -772,7 +773,15 @@ class JobMasterServiceLeadershipRunnerTest { closeAsyncCalledTrigger.await(); final CheckedThread grantLeadershipThread = - createCheckedThread(currentLeaderDriver::isLeader); + createCheckedThread( + () -> { + // DefaultLeaderElectionService enforces a proper event handling + // order (i.e. no two grant or revoke events should appear after + // each other). This requires the leadership to be revoked before + // regaining leadership in this test. + currentLeaderDriver.notLeader(); + currentLeaderDriver.isLeader(); + }); grantLeadershipThread.start(); // finalize ClassloaderLease release to trigger DefaultLeaderElectionService#stop diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java index 17bc0787d22..8025063b67a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java @@ -35,6 +35,7 @@ import java.util.function.Consumer; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link DefaultLeaderElectionService}. */ class DefaultLeaderElectionServiceTest { @@ -77,6 +78,170 @@ class DefaultLeaderElectionServiceTest { }; } + /** + * With {@link MultipleComponentLeaderElectionDriverAdapter} and {@link + * DefaultMultipleComponentLeaderElectionService} it happens that {@link + * LeaderElectionEventHandler#onGrantLeadership(UUID)} happens while instantiating the {@link + * LeaderElectionDriver} (i.e. {@code MultipleComponentLeaderElectionDriverAdapter}). This test + * verifies that the grant event is handled properly. + */ + @Test + void testGrantCallWhileInstantiatingDriver() throws Exception { + final UUID expectedLeaderSessionID = UUID.randomUUID(); + try (final TestingGenericLeaderElectionDriver driver = + TestingGenericLeaderElectionDriver.newBuilder().build(); + final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService( + (eventHandler, errorHandler) -> { + eventHandler.onGrantLeadership(expectedLeaderSessionID); + return driver; + }, + Executors.newDirectExecutorService())) { + testInstance.startLeaderElectionBackend(); + + final TestingContender testingContender = + new TestingContender("unused-address", testInstance); + testInstance.start(testingContender); + + assertThat(testingContender.getLeaderSessionID()).isEqualTo(expectedLeaderSessionID); + + testInstance.stop(); + } + } + + @Test + void testDelayedGrantCallAfterContenderRegistration() throws Exception { + new Context() { + { + runTestWithManuallyTriggeredEvents( + executorService -> { + // we need to stop to deregister the contender that was already + // registered to the service + leaderElectionService.stop(); + + final UUID expectedSessionID = UUID.randomUUID(); + testingLeaderElectionDriver.isLeader(expectedSessionID); + + leaderElectionService.start(testingContender); + + assertThat(testingContender.getLeaderSessionID()) + .as("Leadership grant was not forwarded to the contender, yet.") + .isNull(); + + executorService.trigger(); + + assertThat(testingContender.getLeaderSessionID()) + .as("Leadership grant is actually forwarded to the service.") + .isEqualTo(expectedSessionID); + + testingContender.waitForLeader(); + }); + } + }; + } + + @Test + void testDelayedGrantCallAfterContenderBeingDeregisteredAgain() throws Exception { + new Context() { + { + runTestWithManuallyTriggeredEvents( + executorService -> { + // we need to stop to deregister the contender that was already + // registered to the service + leaderElectionService.stop(); + + final UUID expectedSessionID = UUID.randomUUID(); + testingLeaderElectionDriver.isLeader(expectedSessionID); + executorService.trigger(); + + leaderElectionService.start(testingContender); + + leaderElectionService.stop(); + + executorService.trigger(); + }); + } + }; + } + + /** + * Test to cover the issue described in FLINK-31814. This test could be removed after + * FLINK-31814 is resolved. + */ + @Test + void testOnRevokeCallWhileClosingService() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory( + LeaderElectionEventHandler::onRevokeLeadership); + + try (final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService(driverFactory)) { + testInstance.startLeaderElectionBackend(); + + final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver(); + assertThat(driver).isNotNull(); + + driver.isLeader(); + + final TestingContender contender = new TestingContender("unused-address", testInstance); + testInstance.start(contender); + + contender.waitForLeader(); + + testInstance.stop(); + + contender.throwErrorIfPresent(); + } + } + + @Test + void testStopWhileHavingLeadership() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + + try (final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService(driverFactory)) { + testInstance.startLeaderElectionBackend(); + + final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver(); + assertThat(driver).isNotNull(); + + driver.isLeader(); + + testInstance.start(TestingGenericLeaderContender.newBuilder().build()); + + testInstance.stop(); + } + } + + @Test + void testContenderRegistrationWithoutDriverBeingInstantiatedFails() throws Exception { + try (final DefaultLeaderElectionService leaderElectionService = + new DefaultLeaderElectionService( + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory())) { + assertThatThrownBy( + () -> + leaderElectionService.start( + new TestingContender( + "unused-address", leaderElectionService))) + .isInstanceOf(IllegalStateException.class); + } + } + + @Test + void testDriverShutdownFailsWithContenderStillBeingRegistered() throws Exception { + new Context() { + { + runTestWithSynchronousEventHandling( + () -> + assertThatThrownBy(leaderElectionService::close) + .as( + "The LeaderContender needs to be deregistered before closing the driver.") + .isInstanceOf(IllegalStateException.class)); + } + }; + } + @Test void testProperCleanupOnStopWhenHoldingTheLeadership() throws Exception { new Context() { @@ -325,7 +490,7 @@ class DefaultLeaderElectionServiceTest { } @Test - void testOldConfirmLeaderInformation() throws Exception { + void testOldConfirmLeaderInformationWhileHavingNewLeadership() throws Exception { new Context() { { runTestWithSynchronousEventHandling( @@ -344,6 +509,29 @@ class DefaultLeaderElectionServiceTest { }; } + @Test + void testOldConfirmationWhileHavingLeadershipLost() throws Exception { + new Context() { + { + runTestWithSynchronousEventHandling( + () -> { + testingLeaderElectionDriver.isLeader(); + final UUID currentLeaderSessionId = + leaderElectionService.getLeaderSessionID(); + assertThat(currentLeaderSessionId).isNotNull(); + + testingLeaderElectionDriver.notLeader(); + + // Old confirm call should be ignored. + leaderElectionService.confirmLeadership( + currentLeaderSessionId, TEST_URL); + + assertThat(leaderElectionService.getLeaderSessionID()).isNull(); + }); + } + }; + } + @Test void testErrorForwarding() throws Exception { new Context() { @@ -358,6 +546,8 @@ class DefaultLeaderElectionServiceTest { assertThat(testingContender.getError()) .isNotNull() .hasCause(testException); + + testingContender.clearError(); }); } }; @@ -394,7 +584,9 @@ class DefaultLeaderElectionServiceTest { final TestingContender testingContender = new TestingContender(TEST_URL, leaderElectionService); + leaderElectionService.startLeaderElectionBackend(); leaderElectionService.start(testingContender); + final TestingLeaderElectionDriver currentLeaderDriver = Preconditions.checkNotNull( testingLeaderElectionDriverFactory.getCurrentLeaderDriver()); @@ -402,6 +594,8 @@ class DefaultLeaderElectionServiceTest { currentLeaderDriver.isLeader(); leaderElectionService.stop(); + + testingContender.throwErrorIfPresent(); } @Test @@ -427,6 +621,7 @@ class DefaultLeaderElectionServiceTest { final DefaultLeaderElectionService testInstance = new DefaultLeaderElectionService( (leaderElectionEventHandler, errorHandler) -> driver); + testInstance.startLeaderElectionBackend(); final String address = "leader-address"; testInstance.start( @@ -450,6 +645,7 @@ class DefaultLeaderElectionServiceTest { latch.trigger(); testInstance.stop(); + testInstance.close(); } @Test @@ -489,6 +685,7 @@ class DefaultLeaderElectionServiceTest { final DefaultLeaderElectionService testInstance = new DefaultLeaderElectionService(driverFactory); + testInstance.startLeaderElectionBackend(); testInstance.start(contender); final TestingLeaderElectionDriver driver = driverFactory.getCurrentLeaderDriver(); @@ -499,6 +696,7 @@ class DefaultLeaderElectionServiceTest { latch.trigger(); testInstance.stop(); + testInstance.close(); } private static class Context { @@ -523,19 +721,32 @@ class DefaultLeaderElectionServiceTest { void runTest(RunnableWithException testMethod, ExecutorService leaderEventOperationExecutor) throws Exception { - final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = - new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); - leaderElectionService = - new DefaultLeaderElectionService(driverFactory, leaderEventOperationExecutor); - testingContender = new TestingContender(TEST_URL, leaderElectionService); - - leaderElectionService.start(testingContender); - testingLeaderElectionDriver = driverFactory.getCurrentLeaderDriver(); - - assertThat(testingLeaderElectionDriver).isNotNull(); - testMethod.run(); - - leaderElectionService.stop(); + try { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + leaderElectionService = + new DefaultLeaderElectionService( + driverFactory, leaderEventOperationExecutor); + leaderElectionService.startLeaderElectionBackend(); + testingContender = new TestingContender(TEST_URL, leaderElectionService); + + leaderElectionService.start(testingContender); + testingLeaderElectionDriver = driverFactory.getCurrentLeaderDriver(); + + assertThat(testingLeaderElectionDriver).isNotNull(); + testMethod.run(); + + leaderElectionService.stop(); + leaderElectionService.close(); + } finally { + if (testingContender != null) { + testingContender.throwErrorIfPresent(); + } + + if (testingLeaderElectionDriver != null) { + testingLeaderElectionDriver.close(); + } + } } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index c5a09be8bc1..31fd0f6f7da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -192,7 +192,7 @@ public class LeaderElectionTest { } @Override - public LeaderElectionService createLeaderElectionService() { + public LeaderElectionService createLeaderElectionService() throws Exception { return ZooKeeperUtils.createLeaderElectionService( curatorFrameworkWrapper.asCuratorFramework()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java index c385cb20c2a..35a7e82dcbd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java @@ -80,7 +80,10 @@ public class TestingGenericLeaderContender implements LeaderContender { public static class Builder { private Consumer<UUID> grantLeadershipConsumer = ignoredSessionID -> {}; private Runnable revokeLeadershipRunnable = () -> {}; - private Consumer<Exception> handleErrorConsumer = ignoredError -> {}; + private Consumer<Exception> handleErrorConsumer = + error -> { + throw new AssertionError(error); + }; private Supplier<String> getDescriptionSupplier = () -> "testing contender"; private Builder() {} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java index f11cd585722..c504e3eda2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java @@ -66,6 +66,10 @@ public class TestingLeaderBase { error = errorQueue.take(); } + public void clearError() { + error = null; + } + public void handleError(Throwable ex) { errorQueue.offer(ex); } @@ -80,6 +84,24 @@ public class TestingLeaderBase { return error == null ? errorQueue.poll() : error; } + /** + * Method for exposing errors that were caught during the test execution and need to be exposed + * within the test. + * + * @throws AssertionError with the actual unhandled error as the cause if such an error was + * caught during the test code execution. + */ + public void throwErrorIfPresent() { + final String assertionErrorMessage = "An unhandled error was caught during test execution."; + if (error != null) { + throw new AssertionError(assertionErrorMessage, error); + } + + if (!errorQueue.isEmpty()) { + throw new AssertionError(assertionErrorMessage, errorQueue.poll()); + } + } + public boolean isLeader() { return isLeader; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java index 9f344b30aa3..f3b4b9fd2c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.util.function.ThrowingConsumer; import javax.annotation.Nullable; @@ -38,14 +39,18 @@ public class TestingLeaderElectionDriver implements LeaderElectionDriver { private final LeaderElectionEventHandler leaderElectionEventHandler; private final FatalErrorHandler fatalErrorHandler; + private final ThrowingConsumer<LeaderElectionEventHandler, Exception> closeRunnable; + // Leader information on external storage private LeaderInformation leaderInformation = LeaderInformation.empty(); private TestingLeaderElectionDriver( LeaderElectionEventHandler leaderElectionEventHandler, - FatalErrorHandler fatalErrorHandler) { + FatalErrorHandler fatalErrorHandler, + ThrowingConsumer<LeaderElectionEventHandler, Exception> closeRunnable) { this.leaderElectionEventHandler = leaderElectionEventHandler; this.fatalErrorHandler = fatalErrorHandler; + this.closeRunnable = closeRunnable; } @Override @@ -61,7 +66,7 @@ public class TestingLeaderElectionDriver implements LeaderElectionDriver { @Override public void close() throws Exception { synchronized (lock) { - // noop + closeRunnable.accept(leaderElectionEventHandler); } } @@ -101,12 +106,24 @@ public class TestingLeaderElectionDriver implements LeaderElectionDriver { private TestingLeaderElectionDriver currentLeaderDriver; + private final ThrowingConsumer<LeaderElectionEventHandler, Exception> closeRunnable; + + public TestingLeaderElectionDriverFactory() { + this(ignoredLeaderElectionEventHandler -> {}); + } + + public TestingLeaderElectionDriverFactory( + ThrowingConsumer<LeaderElectionEventHandler, Exception> closeRunnable) { + this.closeRunnable = closeRunnable; + } + @Override public LeaderElectionDriver createLeaderElectionDriver( LeaderElectionEventHandler leaderEventHandler, FatalErrorHandler fatalErrorHandler) { currentLeaderDriver = - new TestingLeaderElectionDriver(leaderEventHandler, fatalErrorHandler); + new TestingLeaderElectionDriver( + leaderEventHandler, fatalErrorHandler, closeRunnable); return currentLeaderDriver; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java index 67a6387c25b..99ba84cf29a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java @@ -139,6 +139,7 @@ class ZooKeeperLeaderElectionConnectionHandlingTest { new ZooKeeperLeaderElectionDriverFactory(client, PATH); DefaultLeaderElectionService leaderElectionService = new DefaultLeaderElectionService(leaderElectionDriverFactory); + leaderElectionService.startLeaderElectionBackend(); try { final TestingConnectionStateListener connectionStateListener = @@ -165,6 +166,7 @@ class ZooKeeperLeaderElectionConnectionHandlingTest { validationLogic.accept(connectionStateListener, contender); } finally { leaderElectionService.stop(); + leaderElectionService.close(); curatorFrameworkWrapper.close(); if (problem == Problem.LOST_CONNECTION) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index 53fc1a1feab..7b9a67797d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -220,6 +220,7 @@ class ZooKeeperLeaderElectionTest { "Stop leader election service of contender #{}.", numberSeenLeaders); leaderElectionService[index].stop(); + leaderElectionService[index].close(); leaderElectionService[index] = null; numberSeenLeaders++; @@ -241,6 +242,7 @@ class ZooKeeperLeaderElectionTest { for (DefaultLeaderElectionService electionService : leaderElectionService) { if (electionService != null) { electionService.stop(); + electionService.close(); } } } @@ -302,6 +304,8 @@ class ZooKeeperLeaderElectionTest { // stop leader election service = revoke leadership leaderElectionService[index].stop(); + leaderElectionService[index].close(); + // create new leader election service which takes part in the leader election leaderElectionService[index] = ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient()); @@ -324,6 +328,7 @@ class ZooKeeperLeaderElectionTest { for (DefaultLeaderElectionService electionService : leaderElectionService) { if (electionService != null) { electionService.stop(); + electionService.close(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java index 533081cfca4..c2f3168efc3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java @@ -26,7 +26,10 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderelection.LeaderInformation; import org.apache.flink.runtime.leaderelection.TestingContender; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionListener; +import org.apache.flink.runtime.leaderelection.ZooKeeperMultipleComponentLeaderElectionDriver; import org.apache.flink.runtime.rpc.AddressResolution; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -49,6 +52,7 @@ import java.net.Socket; import java.net.SocketAddress; import java.net.UnknownHostException; import java.time.Duration; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import static org.assertj.core.api.Assertions.assertThat; @@ -116,7 +120,6 @@ class ZooKeeperLeaderRetrievalTest { long sleepingTime = 1000; LeaderElectionService leaderElectionService = null; - LeaderElectionService faultyLeaderElectionService; Thread thread; @@ -150,13 +153,21 @@ class ZooKeeperLeaderRetrievalTest { AddressResolution.NO_ADDRESS_RESOLUTION, config); - faultyLeaderElectionService = - highAvailabilityServices.getJobManagerLeaderElectionService( - HighAvailabilityServices.DEFAULT_JOB_ID); - TestingContender wrongLeaderAddressContender = - new TestingContender(wrongAddress, faultyLeaderElectionService); - - faultyLeaderElectionService.start(wrongLeaderAddressContender); + // create driver to simulate a separate Flink process having leadership that writes + // its leader information to the ZooKeeper backend and gets lost afterward + final ZooKeeperMultipleComponentLeaderElectionDriver externalProcessDriver = + new ZooKeeperMultipleComponentLeaderElectionDriver( + ZooKeeperUtils.useNamespaceAndEnsurePath( + zooKeeperExtension.getZooKeeperClient( + testingFatalErrorHandlerResource + .getTestingFatalErrorHandler()), + ZooKeeperUtils.generateLeaderLatchPath("")), + new TestingLeaderElectionListener()); + externalProcessDriver.isLeader(); + + externalProcessDriver.publishLeaderInformation( + HighAvailabilityServices.DEFAULT_JOB_ID.toString(), + LeaderInformation.known(UUID.randomUUID(), wrongAddress)); FindConnectingAddress findConnectingAddress = new FindConnectingAddress( @@ -177,7 +188,8 @@ class ZooKeeperLeaderRetrievalTest { Thread.sleep(sleepingTime); - faultyLeaderElectionService.stop(); + externalProcessDriver.notLeader(); + externalProcessDriver.close(); leaderElectionService.start(correctLeaderAddressContender);
