[FLINK-4835] [cluster management] Add embedded version of the high-availability services
This includes the addition of the EmbeddedLeaderService and a clean shutdown hook for all high availability services. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9615f15b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9615f15b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9615f15b Branch: refs/heads/master Commit: 9615f15beca37d6393f6ea78ec35f712536c8f64 Parents: 208324d Author: Stephan Ewen <[email protected]> Authored: Fri Oct 14 23:57:11 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:23 2016 +0100 ---------------------------------------------------------------------- .../StandaloneCheckpointRecoveryFactory.java | 4 +- .../highavailability/EmbeddedNonHaServices.java | 62 +++ .../HighAvailabilityServices.java | 7 +- .../runtime/highavailability/NonHaServices.java | 62 +-- .../highavailability/ZookeeperHaServices.java | 12 +- .../nonha/AbstractNonHaServices.java | 175 +++++++ .../nonha/EmbeddedLeaderService.java | 466 +++++++++++++++++++ .../TestingHighAvailabilityServices.java | 9 + 8 files changed, 736 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index a9624fb..57785ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception { - return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory - .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); + return new StandaloneCompletedCheckpointStore( + CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java new file mode 100644 index 0000000..58da287 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices; +import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +/** + * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case + * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process. + * + * <p>This implementation has no dependencies on any external services. It returns a fix + * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or + * on a local file system and therefore in a storage without guarantees. + */ +public class EmbeddedNonHaServices extends AbstractNonHaServices implements HighAvailabilityServices { + + private final EmbeddedLeaderService resourceManagerLeaderService; + + public EmbeddedNonHaServices() { + super(); + this.resourceManagerLeaderService = new EmbeddedLeaderService(getExecutorService()); + } + + // ------------------------------------------------------------------------ + + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + return resourceManagerLeaderService.createLeaderRetrievalService(); + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { + return resourceManagerLeaderService.createLeaderElectionService(); + } + + // ------------------------------------------------------------------------ + + @Override + public void shutdown() throws Exception { + super.shutdown(); + resourceManagerLeaderService.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 484cddb..f6db682 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -49,11 +49,10 @@ public interface HighAvailabilityServices { * Gets the leader retriever for the job JobMaster which is responsible for the given job * * @param jobID The identifier of the job. - * @param defaultAddress address under which the job manager is reachable * @return * @throws Exception */ - LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception; + LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception; /** * Gets the leader election service for the cluster's resource manager. @@ -86,4 +85,8 @@ public interface HighAvailabilityServices { * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. */ BlobStore createBlobStore() throws IOException; + + // ------------------------------------------------------------------------ + + void shutdown() throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 1c73c01..107cbd0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -18,21 +18,13 @@ package org.apache.flink.runtime.highavailability; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.BlobStore; -import org.apache.flink.runtime.blob.VoidBlobStore; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; -import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry; -import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; -import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -41,35 +33,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * This implementation can be used for testing, and for cluster setups that do not * tolerate failures of the master processes (JobManager, ResourceManager). * - * <p>This implementation has no dependencies on any external services. It returns fix - * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore - * in volatile memory. + * <p>This implementation has no dependencies on any external services. It returns a fix + * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or + * on a local file system and therefore in a storage without guarantees. */ -public class NonHaServices implements HighAvailabilityServices { +public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices { /** The fix address of the ResourceManager */ private final String resourceManagerAddress; - private final ConcurrentHashMap<JobID, String> jobMastersAddress; - /** * Creates a new services class for the fix pre-defined leaders. * * @param resourceManagerAddress The fix address of the ResourceManager */ public NonHaServices(String resourceManagerAddress) { + super(); this.resourceManagerAddress = checkNotNull(resourceManagerAddress); - this.jobMastersAddress = new ConcurrentHashMap<>(16); - } - - /** - * Binds address of a specified job master - * - * @param jobID JobID for the specified job master - * @param jobMasterAddress address for the specified job master - */ - public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) { - jobMastersAddress.put(jobID, jobMasterAddress); } // ------------------------------------------------------------------------ @@ -82,37 +62,7 @@ public class NonHaServices implements HighAvailabilityServices { } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception { - return new StandaloneLeaderRetrievalService(defaultAddress, new UUID(0, 0)); - } - - @Override public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { return new StandaloneLeaderElectionService(); } - - @Override - public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { - return new StandaloneLeaderElectionService(); - } - - @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { - return new StandaloneCheckpointRecoveryFactory(); - } - - @Override - public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { - return new StandaloneSubmittedJobGraphStore(); - } - - @Override - public RunningJobsRegistry getRunningJobsRegistry() throws Exception { - return new NonHaRegistry(); - } - - @Override - public BlobStore createBlobStore() { - return new VoidBlobStore(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index 98b2890..be19c60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -86,7 +86,8 @@ public class ZookeeperHaServices implements HighAvailabilityServices { private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock"; // ------------------------------------------------------------------------ - + + /** The ZooKeeper client to use */ private final CuratorFramework client; @@ -169,6 +170,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices { } // ------------------------------------------------------------------------ + // Shutdown + // ------------------------------------------------------------------------ + + @Override + public void shutdown() throws Exception { + client.close(); + } + + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java new file mode 100644 index 0000000..8c15a52 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Base for all {@link HighAvailabilityServices} that are not highly available, but are backed + * by storage that has no availability guarantees and leader election services that cannot + * elect among multiple distributed leader contenders. + */ +public abstract class AbstractNonHaServices implements HighAvailabilityServices { + + private final Object lock = new Object(); + + private final ExecutorService executor; + + private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices; + + private final RunningJobsRegistry runningJobsRegistry; + + private boolean shutdown; + + // ------------------------------------------------------------------------ + + public AbstractNonHaServices() { + this.executor = Executors.newCachedThreadPool(new ServicesThreadFactory()); + this.jobManagerLeaderServices = new HashMap<>(); + this.runningJobsRegistry = new NonHaRegistry(); + } + + // ------------------------------------------------------------------------ + // services + // ------------------------------------------------------------------------ + + @Override + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { + checkNotNull(jobID); + + synchronized (lock) { + checkNotShutdown(); + EmbeddedLeaderService service = getOrCreateJobManagerService(jobID); + return service.createLeaderRetrievalService(); + } + } + + @Override + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception { + checkNotNull(jobID); + + synchronized (lock) { + checkNotShutdown(); + EmbeddedLeaderService service = getOrCreateJobManagerService(jobID); + return service.createLeaderElectionService(); + } + } + + @GuardedBy("lock") + private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) { + EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID); + if (service == null) { + service = new EmbeddedLeaderService(executor); + jobManagerLeaderServices.put(jobID, service); + } + return service; + } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + checkNotShutdown(); + return new StandaloneCheckpointRecoveryFactory(); + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + checkNotShutdown(); + return new StandaloneSubmittedJobGraphStore(); + } + + @Override + public RunningJobsRegistry getRunningJobsRegistry() throws Exception { + checkNotShutdown(); + return runningJobsRegistry; + } + + @Override + public BlobStore createBlobStore() throws IOException { + checkNotShutdown(); + return new VoidBlobStore(); + } + + // ------------------------------------------------------------------------ + // shutdown + // ------------------------------------------------------------------------ + + @Override + public void shutdown() throws Exception { + synchronized (lock) { + if (!shutdown) { + shutdown = true; + + // no further calls should be dispatched + executor.shutdownNow(); + + // stop all job manager leader services + for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) { + service.shutdown(); + } + jobManagerLeaderServices.clear(); + } + } + } + + private void checkNotShutdown() { + checkState(!shutdown, "high availability services are shut down"); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + protected ExecutorService getExecutorService() { + return executor; + } + + private static final class ServicesThreadFactory implements ThreadFactory { + + private AtomicInteger enumerator = new AtomicInteger(); + + @Override + public Thread newThread(@Nonnull Runnable r) { + Thread thread = new Thread(r, "Flink HA services thread #" + enumerator.incrementAndGet()); + thread.setDaemon(true); + return thread; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java new file mode 100644 index 0000000..84ac551 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha; + +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A simple leader election service, which selects a leader among contenders and notifies listeners. + * + * <p>An election service for contenders can be created via {@link #createLeaderElectionService()}, + * a listener service for leader observers can be created via {@link #createLeaderRetrievalService()}. + */ +public class EmbeddedLeaderService { + + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedLeaderService.class); + + private final Object lock = new Object(); + + private final Executor notificationExecutor; + + private final Set<EmbeddedLeaderElectionService> allLeaderContenders; + + private final Set<EmbeddedLeaderRetrievalService> listeners; + + /** proposed leader, which has been notified of leadership grant, but has not confirmed */ + private EmbeddedLeaderElectionService currentLeaderProposed; + + /** actual leader that has confirmed leadership and of which listeners have been notified */ + private EmbeddedLeaderElectionService currentLeaderConfirmed; + + /** fencing UID for the current leader (or proposed leader) */ + private UUID currentLeaderSessionId; + + /** the cached address of the current leader */ + private String currentLeaderAddress; + + /** flag marking the service as terminated */ + private boolean shutdown; + + // ------------------------------------------------------------------------ + + public EmbeddedLeaderService(ExecutorService notificationsDispatcher) { + this.notificationExecutor = checkNotNull(notificationsDispatcher); + this.allLeaderContenders = new HashSet<>(); + this.listeners = new HashSet<>(); + } + + // ------------------------------------------------------------------------ + // shutdown and errors + // ------------------------------------------------------------------------ + + /** + * Shuts down this leader election service. + * + * <p>This method does not perform a clean revocation of the leader status and + * no notification to any leader listeners. It simply notifies all contenders + * and listeners that the service is no longer available. + */ + public void shutdown() { + synchronized (lock) { + shutdownInternally(new Exception("Leader election service is shutting down")); + } + } + + private void fatalError(Throwable error) { + LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error); + + synchronized (lock) { + shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error)); + } + } + + @GuardedBy("lock") + private void shutdownInternally(Exception exceptionForHandlers) { + assert Thread.holdsLock(lock); + + if (!shutdown) { + // clear all leader status + currentLeaderProposed = null; + currentLeaderConfirmed = null; + currentLeaderSessionId = null; + currentLeaderAddress = null; + + // fail all registered listeners + for (EmbeddedLeaderElectionService service : allLeaderContenders) { + service.shutdown(exceptionForHandlers); + } + allLeaderContenders.clear(); + + // fail all registered listeners + for (EmbeddedLeaderRetrievalService service : listeners) { + service.shutdown(exceptionForHandlers); + } + listeners.clear(); + + shutdown = true; + } + } + + // ------------------------------------------------------------------------ + // creating contenders and listeners + // ------------------------------------------------------------------------ + + public LeaderElectionService createLeaderElectionService() { + checkState(!shutdown, "leader election service is shut down"); + return new EmbeddedLeaderElectionService(); + } + + public LeaderRetrievalService createLeaderRetrievalService() { + checkState(!shutdown, "leader election service is shut down"); + return new EmbeddedLeaderRetrievalService(); + } + + // ------------------------------------------------------------------------ + // adding and removing contenders & listeners + // ------------------------------------------------------------------------ + + /** + * Callback from leader contenders when they start their service. + */ + void addContender(EmbeddedLeaderElectionService service, LeaderContender contender) { + synchronized (lock) { + checkState(!shutdown, "leader election service is shut down"); + checkState(!service.running, "leader election service is already started"); + + try { + if (!allLeaderContenders.add(service)) { + throw new IllegalStateException("leader election service was added to this service multiple times"); + } + + service.contender = contender; + service.running = true; + + updateLeader(); + } + catch (Throwable t) { + fatalError(t); + } + } + } + + /** + * Callback from leader contenders when they stop their service. + */ + void removeContender(EmbeddedLeaderElectionService service) { + synchronized (lock) { + // if the service was not even started, simply do nothing + if (!service.running || shutdown) { + return; + } + + try { + if (!allLeaderContenders.remove(service)) { + throw new IllegalStateException("leader election service does not belong to this service"); + } + + // stop the service + service.contender = null; + service.running = false; + service.isLeader = false; + + // if that was the current leader, unset its status + if (currentLeaderConfirmed == service) { + currentLeaderConfirmed = null; + currentLeaderSessionId = null; + currentLeaderAddress = null; + } + if (currentLeaderProposed == service) { + currentLeaderProposed = null; + currentLeaderSessionId = null; + } + + updateLeader(); + } + catch (Throwable t) { + fatalError(t); + } + } + } + + /** + * Callback from leader contenders when they confirm a leader grant + */ + void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) { + synchronized (lock) { + // if the service was shut down in the meantime, ignore this confirmation + if (!service.running || shutdown) { + return; + } + + try { + // check if the confirmation is for the same grant, or whether it is a stale grant + if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) { + final String address = service.contender.getAddress(); + LOG.info("Received confirmation of leadership for leader {} / session={}", address, leaderSessionId); + + // mark leadership + currentLeaderConfirmed = service; + currentLeaderAddress = address; + currentLeaderProposed = null; + service.isLeader = true; + + // notify all listeners + for (EmbeddedLeaderRetrievalService listener : listeners) { + notificationExecutor.execute( + new NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG)); + } + } + else { + LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring."); + } + } + catch (Throwable t) { + fatalError(t); + } + } + } + + @GuardedBy("lock") + private void updateLeader() { + // this must be called under the lock + assert Thread.holdsLock(lock); + + if (currentLeaderConfirmed == null && currentLeaderProposed == null) { + // we need a new leader + if (allLeaderContenders.isEmpty()) { + // no new leader available, tell everyone that there is no leader currently + for (EmbeddedLeaderRetrievalService listener : listeners) { + notificationExecutor.execute( + new NotifyOfLeaderCall(null, null, listener.listener, LOG)); + } + } + else { + // propose a leader and ask it + final UUID leaderSessionId = UUID.randomUUID(); + EmbeddedLeaderElectionService leaderService = allLeaderContenders.iterator().next(); + + currentLeaderSessionId = leaderSessionId; + currentLeaderProposed = leaderService; + + notificationExecutor.execute( + new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG)); + } + } + } + + void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) { + synchronized (lock) { + checkState(!shutdown, "leader election service is shut down"); + checkState(!service.running, "leader retrieval service is already started"); + + try { + if (!listeners.add(service)) { + throw new IllegalStateException("leader retrieval service was added to this service multiple times"); + } + + service.listener = listener; + service.running = true; + + // if we already have a leader, immediately notify this new listener + if (currentLeaderConfirmed != null) { + notificationExecutor.execute( + new NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, LOG)); + } + } + catch (Throwable t) { + fatalError(t); + } + } + } + + void removeListener(EmbeddedLeaderRetrievalService service) { + synchronized (lock) { + // if the service was not even started, simply do nothing + if (!service.running || shutdown) { + return; + } + + try { + if (!listeners.remove(service)) { + throw new IllegalStateException("leader retrieval service does not belong to this service"); + } + + // stop the service + service.listener = null; + service.running = false; + } + catch (Throwable t) { + fatalError(t); + } + } + } + + // ------------------------------------------------------------------------ + // election and retrieval service implementations + // ------------------------------------------------------------------------ + + private class EmbeddedLeaderElectionService implements LeaderElectionService { + + volatile LeaderContender contender; + + volatile boolean isLeader; + + volatile boolean running; + + @Override + public void start(LeaderContender contender) throws Exception { + checkNotNull(contender); + addContender(this, contender); + } + + @Override + public void stop() throws Exception { + removeContender(this); + } + + @Override + public void confirmLeaderSessionID(UUID leaderSessionID) { + checkNotNull(leaderSessionID); + confirmLeader(this, leaderSessionID); + } + + @Override + public boolean hasLeadership() { + return isLeader; + } + + void shutdown(Exception cause) { + if (running) { + running = false; + isLeader = false; + contender.handleError(cause); + contender = null; + } + } + } + + // ------------------------------------------------------------------------ + + private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService { + + volatile LeaderRetrievalListener listener; + + volatile boolean running; + + @Override + public void start(LeaderRetrievalListener listener) throws Exception { + checkNotNull(listener); + addListener(this, listener); + } + + @Override + public void stop() throws Exception { + removeListener(this); + } + + public void shutdown(Exception cause) { + if (running) { + running = false; + listener.handleError(cause); + listener = null; + } + } + } + + // ------------------------------------------------------------------------ + // asynchronous notifications + // ------------------------------------------------------------------------ + + private static class NotifyOfLeaderCall implements Runnable { + + @Nullable + private final String address; // null if leader revoked without new leader + @Nullable + private final UUID leaderSessionId; // null if leader revoked without new leader + + private final LeaderRetrievalListener listener; + private final Logger logger; + + NotifyOfLeaderCall( + @Nullable String address, + @Nullable UUID leaderSessionId, + LeaderRetrievalListener listener, + Logger logger) { + + this.address = address; + this.leaderSessionId = leaderSessionId; + this.listener = checkNotNull(listener); + this.logger = checkNotNull(logger); + } + + @Override + public void run() { + try { + listener.notifyLeaderAddress(address, leaderSessionId); + } + catch (Throwable t) { + logger.warn("Error notifying leader listener about new leader", t); + listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); + } + } + } + + // ------------------------------------------------------------------------ + + private static class GrantLeadershipCall implements Runnable { + + private final LeaderContender contender; + private final UUID leaderSessionId; + private final Logger logger; + + GrantLeadershipCall( + LeaderContender contender, + UUID leaderSessionId, + Logger logger) { + + this.contender = checkNotNull(contender); + this.leaderSessionId = checkNotNull(leaderSessionId); + this.logger = checkNotNull(logger); + } + + @Override + public void run() { + try { + contender.grantLeadership(leaderSessionId); + } + catch (Throwable t) { + logger.warn("Error notifying leader listener about new leader", t); + contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9615f15b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 38e372d..3e88e8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -154,4 +154,13 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices public BlobStore createBlobStore() throws IOException { return new VoidBlobStore(); } + + // ------------------------------------------------------------------------ + // Shutdown + // ------------------------------------------------------------------------ + + @Override + public void shutdown() throws Exception { + // nothing to do, since this should not shut down individual services, but cross service parts + } }
