[FLINK-4835] [cluster management] Add embedded version of the high-availability 
services

This includes the addition of the EmbeddedLeaderService
and a clean shutdown hook for all high availability services.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79476624
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79476624
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79476624

Branch: refs/heads/flip-6
Commit: 7947662467d102edc675fde19948b7638a044343
Parents: da16b0a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:57:11 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:09:39 2016 +0200

----------------------------------------------------------------------
 .../StandaloneCheckpointRecoveryFactory.java    |   4 +-
 .../highavailability/EmbeddedNonHaServices.java |  62 +++
 .../HighAvailabilityServices.java               |   7 +-
 .../runtime/highavailability/NonHaServices.java |  62 +--
 .../highavailability/ZookeeperHaServices.java   |  12 +-
 .../nonha/AbstractNonHaServices.java            | 175 +++++++
 .../nonha/EmbeddedLeaderService.java            | 466 +++++++++++++++++++
 .../TestingHighAvailabilityServices.java        |   9 +
 8 files changed, 736 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a9624fb..57785ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
        public CompletedCheckpointStore createCheckpointStore(JobID jobId, 
ClassLoader userClassLoader)
                        throws Exception {
 
-               return new 
StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
-                               .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
+               return new StandaloneCompletedCheckpointStore(
+                               
CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
new file mode 100644
index 0000000..58da287
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
+import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} for the 
non-high-availability case
+ * where all participants (ResourceManager, JobManagers, TaskManagers) run in 
the same process.
+ *
+ * <p>This implementation has no dependencies on any external services. It 
returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply 
on the heap or
+ * on a local file system and therefore in a storage without guarantees.
+ */
+public class EmbeddedNonHaServices extends AbstractNonHaServices implements 
HighAvailabilityServices {
+
+       private final EmbeddedLeaderService resourceManagerLeaderService;
+
+       public EmbeddedNonHaServices() {
+               super();
+               this.resourceManagerLeaderService = new 
EmbeddedLeaderService(getExecutorService());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+               return 
resourceManagerLeaderService.createLeaderRetrievalService();
+       }
+
+       @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+               return 
resourceManagerLeaderService.createLeaderElectionService();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void shutdown() throws Exception {
+               super.shutdown();
+               resourceManagerLeaderService.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 484cddb..f6db682 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -49,11 +49,10 @@ public interface HighAvailabilityServices {
         * Gets the leader retriever for the job JobMaster which is responsible 
for the given job
         *
         * @param jobID The identifier of the job.
-        * @param defaultAddress address under which the job manager is 
reachable
         * @return
         * @throws Exception
         */
-       LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String 
defaultAddress) throws Exception;
+       LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws 
Exception;
 
        /**
         * Gets the leader election service for the cluster's resource manager.
@@ -86,4 +85,8 @@ public interface HighAvailabilityServices {
         * Creates the BLOB store in which BLOBs are stored in a 
highly-available fashion.
         */
        BlobStore createBlobStore() throws IOException;
+
+       // 
------------------------------------------------------------------------
+
+       void shutdown() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 1c73c01..107cbd0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,21 +18,13 @@
 
 package org.apache.flink.runtime.highavailability;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
-import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,35 +33,23 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * This implementation can be used for testing, and for cluster setups that do 
not
  * tolerate failures of the master processes (JobManager, ResourceManager).
  * 
- * <p>This implementation has no dependencies on any external services. It 
returns fix
- * pre-configured leaders, and stores checkpoints and metadata simply on the 
heap and therefore
- * in volatile memory.
+ * <p>This implementation has no dependencies on any external services. It 
returns a fix
+ * pre-configured ResourceManager, and stores checkpoints and metadata simply 
on the heap or
+ * on a local file system and therefore in a storage without guarantees.
  */
-public class NonHaServices implements HighAvailabilityServices {
+public class NonHaServices extends AbstractNonHaServices implements 
HighAvailabilityServices {
 
        /** The fix address of the ResourceManager */
        private final String resourceManagerAddress;
 
-       private final ConcurrentHashMap<JobID, String> jobMastersAddress;
-
        /**
         * Creates a new services class for the fix pre-defined leaders.
         * 
         * @param resourceManagerAddress    The fix address of the 
ResourceManager
         */
        public NonHaServices(String resourceManagerAddress) {
+               super();
                this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
-               this.jobMastersAddress = new ConcurrentHashMap<>(16);
-       }
-
-       /**
-        * Binds address of a specified job master
-        *
-        * @param jobID            JobID for the specified job master
-        * @param jobMasterAddress address for the specified job master
-        */
-       public void bindJobMasterLeaderAddress(JobID jobID, String 
jobMasterAddress) {
-               jobMastersAddress.put(jobID, jobMasterAddress);
        }
 
        // 
------------------------------------------------------------------------
@@ -82,37 +62,7 @@ public class NonHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultAddress) throws Exception {
-               return new StandaloneLeaderRetrievalService(defaultAddress, new 
UUID(0, 0));
-       }
-
-       @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
                return new StandaloneLeaderElectionService();
        }
-
-       @Override
-       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
-               return new StandaloneLeaderElectionService();
-       }
-
-       @Override
-       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
-               return new StandaloneCheckpointRecoveryFactory();
-       }
-
-       @Override
-       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
-               return new StandaloneSubmittedJobGraphStore();
-       }
-
-       @Override
-       public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
-               return new NonHaRegistry();
-       }
-
-       @Override
-       public BlobStore createBlobStore() {
-               return new VoidBlobStore();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index bbe8ecb..eae45ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -85,7 +85,8 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        private static final String JOB_MANAGER_LEADER_PATH = 
"/job_manager_lock";
 
        // 
------------------------------------------------------------------------
-
+       
+       
        /** The ZooKeeper client to use */
        private final CuratorFramework client;
 
@@ -168,6 +169,15 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        }
 
        // 
------------------------------------------------------------------------
+       //  Shutdown
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void shutdown() throws Exception {
+               client.close();
+       }
+
+       // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
new file mode 100644
index 0000000..8c15a52
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Base for all {@link HighAvailabilityServices} that are not highly 
available, but are backed
+ * by storage that has no availability guarantees and leader election services 
that cannot
+ * elect among multiple distributed leader contenders.
+ */
+public abstract class AbstractNonHaServices implements 
HighAvailabilityServices {
+
+       private final Object lock = new Object();
+
+       private final ExecutorService executor;
+
+       private final HashMap<JobID, EmbeddedLeaderService> 
jobManagerLeaderServices;
+
+       private final RunningJobsRegistry runningJobsRegistry;
+
+       private boolean shutdown;
+
+       // 
------------------------------------------------------------------------
+
+       public AbstractNonHaServices() {
+               this.executor = Executors.newCachedThreadPool(new 
ServicesThreadFactory());
+               this.jobManagerLeaderServices = new HashMap<>();
+               this.runningJobsRegistry = new NonHaRegistry();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  services
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
+               checkNotNull(jobID);
+
+               synchronized (lock) {
+                       checkNotShutdown();
+                       EmbeddedLeaderService service = 
getOrCreateJobManagerService(jobID);
+                       return service.createLeaderRetrievalService();
+               }
+       }
+
+       @Override
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
+               checkNotNull(jobID);
+
+               synchronized (lock) {
+                       checkNotShutdown();
+                       EmbeddedLeaderService service = 
getOrCreateJobManagerService(jobID);
+                       return service.createLeaderElectionService();
+               }
+       }
+
+       @GuardedBy("lock")
+       private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) 
{
+               EmbeddedLeaderService service = 
jobManagerLeaderServices.get(jobID);
+               if (service == null) {
+                       service = new EmbeddedLeaderService(executor);
+                       jobManagerLeaderServices.put(jobID, service);
+               }
+               return service;
+       }
+
+       @Override
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
+               checkNotShutdown();
+               return new StandaloneCheckpointRecoveryFactory();
+       }
+
+       @Override
+       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
+               checkNotShutdown();
+               return new StandaloneSubmittedJobGraphStore();
+       }
+
+       @Override
+       public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+               checkNotShutdown();
+               return runningJobsRegistry;
+       }
+
+       @Override
+       public BlobStore createBlobStore() throws IOException {
+               checkNotShutdown();
+               return new VoidBlobStore();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  shutdown
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void shutdown() throws Exception {
+               synchronized (lock) {
+                       if (!shutdown) {
+                               shutdown = true;
+
+                               // no further calls should be dispatched
+                               executor.shutdownNow();
+
+                               // stop all job manager leader services
+                               for (EmbeddedLeaderService service : 
jobManagerLeaderServices.values()) {
+                                       service.shutdown();
+                               }
+                               jobManagerLeaderServices.clear();
+                       }
+               }
+       }
+
+       private void checkNotShutdown() {
+               checkState(!shutdown, "high availability services are shut 
down");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       protected ExecutorService getExecutorService() {
+               return executor;
+       }
+
+       private static final class ServicesThreadFactory implements 
ThreadFactory {
+
+               private AtomicInteger enumerator = new AtomicInteger();
+
+               @Override
+               public Thread newThread(@Nonnull Runnable r) {
+                       Thread thread = new Thread(r, "Flink HA services thread 
#" + enumerator.incrementAndGet());
+                       thread.setDaemon(true);
+                       return thread;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
new file mode 100644
index 0000000..84ac551
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A simple leader election service, which selects a leader among contenders 
and notifies listeners.
+ * 
+ * <p>An election service for contenders can be created via {@link 
#createLeaderElectionService()},
+ * a listener service for leader observers can be created via {@link 
#createLeaderRetrievalService()}.
+ */
+public class EmbeddedLeaderService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedLeaderService.class);
+
+       private final Object lock = new Object();
+
+       private final Executor notificationExecutor;
+
+       private final Set<EmbeddedLeaderElectionService> allLeaderContenders;
+
+       private final Set<EmbeddedLeaderRetrievalService> listeners;
+
+       /** proposed leader, which has been notified of leadership grant, but 
has not confirmed */
+       private EmbeddedLeaderElectionService currentLeaderProposed;
+
+       /** actual leader that has confirmed leadership and of which listeners 
have been notified */
+       private EmbeddedLeaderElectionService currentLeaderConfirmed;
+
+       /** fencing UID for the current leader (or proposed leader) */
+       private UUID currentLeaderSessionId;
+
+       /** the cached address of the current leader */
+       private String currentLeaderAddress;
+
+       /** flag marking the service as terminated */
+       private boolean shutdown;
+
+       // 
------------------------------------------------------------------------
+
+       public EmbeddedLeaderService(ExecutorService notificationsDispatcher) {
+               this.notificationExecutor = 
checkNotNull(notificationsDispatcher);
+               this.allLeaderContenders = new HashSet<>();
+               this.listeners = new HashSet<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  shutdown and errors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Shuts down this leader election service.
+        * 
+        * <p>This method does not perform a clean revocation of the leader 
status and
+        * no notification to any leader listeners. It simply notifies all 
contenders
+        * and listeners that the service is no longer available.
+        */
+       public void shutdown() {
+               synchronized (lock) {
+                       shutdownInternally(new Exception("Leader election 
service is shutting down"));
+               }
+       }
+
+       private void fatalError(Throwable error) {
+               LOG.error("Embedded leader election service encountered a fatal 
error. Shutting down service.", error);
+
+               synchronized (lock) {
+                       shutdownInternally(new Exception("Leader election 
service is shutting down after a fatal error", error));
+               }
+       }
+
+       @GuardedBy("lock")
+       private void shutdownInternally(Exception exceptionForHandlers) {
+               assert Thread.holdsLock(lock);
+
+               if (!shutdown) {
+                       // clear all leader status
+                       currentLeaderProposed = null;
+                       currentLeaderConfirmed = null;
+                       currentLeaderSessionId = null;
+                       currentLeaderAddress = null;
+
+                       // fail all registered listeners
+                       for (EmbeddedLeaderElectionService service : 
allLeaderContenders) {
+                               service.shutdown(exceptionForHandlers);
+                       }
+                       allLeaderContenders.clear();
+
+                       // fail all registered listeners
+                       for (EmbeddedLeaderRetrievalService service : 
listeners) {
+                               service.shutdown(exceptionForHandlers);
+                       }
+                       listeners.clear();
+
+                       shutdown = true;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  creating contenders and listeners
+       // 
------------------------------------------------------------------------
+
+       public LeaderElectionService createLeaderElectionService() {
+               checkState(!shutdown, "leader election service is shut down");
+               return new EmbeddedLeaderElectionService();
+       }
+
+       public LeaderRetrievalService createLeaderRetrievalService() {
+               checkState(!shutdown, "leader election service is shut down");
+               return new EmbeddedLeaderRetrievalService();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  adding and removing contenders & listeners
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Callback from leader contenders when they start their service.
+        */
+       void addContender(EmbeddedLeaderElectionService service, 
LeaderContender contender) {
+               synchronized (lock) {
+                       checkState(!shutdown, "leader election service is shut 
down");
+                       checkState(!service.running, "leader election service 
is already started");
+
+                       try {
+                               if (!allLeaderContenders.add(service)) {
+                                       throw new IllegalStateException("leader 
election service was added to this service multiple times");
+                               }
+
+                               service.contender = contender;
+                               service.running = true;
+
+                               updateLeader();
+                       }
+                       catch (Throwable t) {
+                               fatalError(t);
+                       }
+               }
+       }
+
+       /**
+        * Callback from leader contenders when they stop their service.
+        */
+       void removeContender(EmbeddedLeaderElectionService service) {
+               synchronized (lock) {
+                       // if the service was not even started, simply do 
nothing
+                       if (!service.running || shutdown) {
+                               return;
+                       }
+
+                       try {
+                               if (!allLeaderContenders.remove(service)) {
+                                       throw new IllegalStateException("leader 
election service does not belong to this service");
+                               }
+
+                               // stop the service
+                               service.contender = null;
+                               service.running = false;
+                               service.isLeader = false;
+
+                               // if that was the current leader, unset its 
status
+                               if (currentLeaderConfirmed == service) {
+                                       currentLeaderConfirmed = null;
+                                       currentLeaderSessionId = null;
+                                       currentLeaderAddress = null;
+                               }
+                               if (currentLeaderProposed == service) {
+                                       currentLeaderProposed = null;
+                                       currentLeaderSessionId = null;
+                               }
+
+                               updateLeader();
+                       }
+                       catch (Throwable t) {
+                               fatalError(t);
+                       }
+               }
+       }
+
+       /**
+        * Callback from leader contenders when they confirm a leader grant
+        */
+       void confirmLeader(final EmbeddedLeaderElectionService service, final 
UUID leaderSessionId) {
+               synchronized (lock) {
+                       // if the service was shut down in the meantime, ignore 
this confirmation
+                       if (!service.running || shutdown) {
+                               return;
+                       }
+
+                       try {
+                               // check if the confirmation is for the same 
grant, or whether it is a stale grant 
+                               if (service == currentLeaderProposed && 
currentLeaderSessionId.equals(leaderSessionId)) {
+                                       final String address = 
service.contender.getAddress();
+                                       LOG.info("Received confirmation of 
leadership for leader {} / session={}", address, leaderSessionId);
+
+                                       // mark leadership
+                                       currentLeaderConfirmed = service;
+                                       currentLeaderAddress = address;
+                                       currentLeaderProposed = null;
+                                       service.isLeader = true;
+
+                                       // notify all listeners
+                                       for (EmbeddedLeaderRetrievalService 
listener : listeners) {
+                                               notificationExecutor.execute(
+                                                               new 
NotifyOfLeaderCall(address, leaderSessionId, listener.listener, LOG));
+                                       }
+                               }
+                               else {
+                                       LOG.debug("Received confirmation of 
leadership for a stale leadership grant. Ignoring.");
+                               }
+                       }
+                       catch (Throwable t) {
+                               fatalError(t);
+                       }
+               }
+       }
+
+       @GuardedBy("lock")
+       private void updateLeader() {
+               // this must be called under the lock
+               assert Thread.holdsLock(lock);
+
+               if (currentLeaderConfirmed == null && currentLeaderProposed == 
null) {
+                       // we need a new leader
+                       if (allLeaderContenders.isEmpty()) {
+                               // no new leader available, tell everyone that 
there is no leader currently
+                               for (EmbeddedLeaderRetrievalService listener : 
listeners) {
+                                       notificationExecutor.execute(
+                                                       new 
NotifyOfLeaderCall(null, null, listener.listener, LOG));
+                               }
+                       }
+                       else {
+                               // propose a leader and ask it
+                               final UUID leaderSessionId = UUID.randomUUID();
+                               EmbeddedLeaderElectionService leaderService = 
allLeaderContenders.iterator().next();
+
+                               currentLeaderSessionId = leaderSessionId;
+                               currentLeaderProposed = leaderService;
+
+                               notificationExecutor.execute(
+                                               new 
GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
+                       }
+               }
+       }
+
+       void addListener(EmbeddedLeaderRetrievalService service, 
LeaderRetrievalListener listener) {
+               synchronized (lock) {
+                       checkState(!shutdown, "leader election service is shut 
down");
+                       checkState(!service.running, "leader retrieval service 
is already started");
+
+                       try {
+                               if (!listeners.add(service)) {
+                                       throw new IllegalStateException("leader 
retrieval service was added to this service multiple times");
+                               }
+
+                               service.listener = listener;
+                               service.running = true;
+
+                               // if we already have a leader, immediately 
notify this new listener
+                               if (currentLeaderConfirmed != null) {
+                                       notificationExecutor.execute(
+                                                       new 
NotifyOfLeaderCall(currentLeaderAddress, currentLeaderSessionId, listener, 
LOG));
+                               }
+                       }
+                       catch (Throwable t) {
+                               fatalError(t);
+                       }
+               }
+       }
+
+       void removeListener(EmbeddedLeaderRetrievalService service) {
+               synchronized (lock) {
+                       // if the service was not even started, simply do 
nothing
+                       if (!service.running || shutdown) {
+                               return;
+                       }
+
+                       try {
+                               if (!listeners.remove(service)) {
+                                       throw new IllegalStateException("leader 
retrieval service does not belong to this service");
+                               }
+
+                               // stop the service
+                               service.listener = null;
+                               service.running = false;
+                       }
+                       catch (Throwable t) {
+                               fatalError(t);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  election and retrieval service implementations 
+       // 
------------------------------------------------------------------------
+
+       private class EmbeddedLeaderElectionService implements 
LeaderElectionService {
+
+               volatile LeaderContender contender;
+
+               volatile boolean isLeader;
+
+               volatile boolean running;
+
+               @Override
+               public void start(LeaderContender contender) throws Exception {
+                       checkNotNull(contender);
+                       addContender(this, contender);
+               }
+
+               @Override
+               public void stop() throws Exception {
+                       removeContender(this);
+               }
+
+               @Override
+               public void confirmLeaderSessionID(UUID leaderSessionID) {
+                       checkNotNull(leaderSessionID);
+                       confirmLeader(this, leaderSessionID);
+               }
+
+               @Override
+               public boolean hasLeadership() {
+                       return isLeader;
+               }
+
+               void shutdown(Exception cause) {
+                       if (running) {
+                               running = false;
+                               isLeader = false;
+                               contender.handleError(cause);
+                               contender = null;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private class EmbeddedLeaderRetrievalService implements 
LeaderRetrievalService {
+
+               volatile LeaderRetrievalListener listener;
+
+               volatile boolean running;
+
+               @Override
+               public void start(LeaderRetrievalListener listener) throws 
Exception {
+                       checkNotNull(listener);
+                       addListener(this, listener);
+               }
+
+               @Override
+               public void stop() throws Exception {
+                       removeListener(this);
+               }
+
+               public void shutdown(Exception cause) {
+                       if (running) {
+                               running = false;
+                               listener.handleError(cause);
+                               listener = null;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  asynchronous notifications
+       // 
------------------------------------------------------------------------
+
+       private static class NotifyOfLeaderCall implements Runnable {
+
+               @Nullable
+               private final String address;       // null if leader revoked 
without new leader
+               @Nullable
+               private final UUID leaderSessionId; // null if leader revoked 
without new leader
+
+               private final LeaderRetrievalListener listener;
+               private final Logger logger;
+
+               NotifyOfLeaderCall(
+                               @Nullable String address,
+                               @Nullable UUID leaderSessionId,
+                               LeaderRetrievalListener listener,
+                               Logger logger) {
+
+                       this.address = address;
+                       this.leaderSessionId = leaderSessionId;
+                       this.listener = checkNotNull(listener);
+                       this.logger = checkNotNull(logger);
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               listener.notifyLeaderAddress(address, 
leaderSessionId);
+                       }
+                       catch (Throwable t) {
+                               logger.warn("Error notifying leader listener 
about new leader", t);
+                               listener.handleError(t instanceof Exception ? 
(Exception) t : new Exception(t));
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class GrantLeadershipCall implements Runnable {
+
+               private final LeaderContender contender;
+               private final UUID leaderSessionId;
+               private final Logger logger;
+
+               GrantLeadershipCall(
+                               LeaderContender contender,
+                               UUID leaderSessionId,
+                               Logger logger) {
+
+                       this.contender = checkNotNull(contender);
+                       this.leaderSessionId = checkNotNull(leaderSessionId);
+                       this.logger = checkNotNull(logger);
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               contender.grantLeadership(leaderSessionId);
+                       }
+                       catch (Throwable t) {
+                               logger.warn("Error notifying leader listener 
about new leader", t);
+                               contender.handleError(t instanceof Exception ? 
(Exception) t : new Exception(t));
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 38e372d..3e88e8c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -154,4 +154,13 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        public BlobStore createBlobStore() throws IOException {
                return new VoidBlobStore();
        }
+
+       // 
------------------------------------------------------------------------
+       //  Shutdown
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void shutdown() throws Exception {
+               // nothing to do, since this should not shut down individual 
services, but cross service parts
+       }
 }

Reply via email to