[FLINK-5254] [yarn] Implement YARN High-Availability Services

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a7dbda7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a7dbda7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a7dbda7

Branch: refs/heads/master
Commit: 2a7dbda79a00863a511fcf64b339770d1d00f805
Parents: e2922ad
Author: Stephan Ewen <[email protected]>
Authored: Mon Dec 5 01:34:32 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:27 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  12 +-
 .../FsNegativeRunningJobsRegistryTest.java      | 121 ++++++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  20 +-
 .../highavailability/EmbeddedNonHaServices.java |  15 +-
 .../FsNegativeRunningJobsRegistry.java          | 153 ++++++++
 .../HighAvailabilityServices.java               |  69 +++-
 .../runtime/highavailability/NonHaServices.java |  21 +-
 .../highavailability/ServicesThreadFactory.java |  40 ++
 .../highavailability/ZookeeperHaServices.java   |  17 +-
 .../SingleLeaderElectionService.java            | 384 +++++++++++++++++++
 .../nonha/AbstractNonHaServices.java            |  29 +-
 .../nonha/EmbeddedLeaderService.java            |   2 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |   1 -
 .../StandaloneLeaderRetrievalService.java       |   1 +
 .../flink/runtime/minicluster/MiniCluster.java  |   2 +-
 .../resourcemanager/JobLeaderIdService.java     |   2 +-
 .../resourcemanager/ResourceManagerRunner.java  |   3 +-
 .../flink/runtime/rpc/RpcServiceUtils.java      |  70 ++++
 .../FsNegativeRunningJobsRegistryTest.java      |  85 ++++
 .../TestingHighAvailabilityServices.java        |  14 +-
 .../SingleLeaderElectionServiceTest.java        | 226 +++++++++++
 flink-yarn/pom.xml                              |  21 +
 ...bstractYarnFlinkApplicationMasterRunner.java |  34 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  18 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |  13 +-
 .../yarn/configuration/YarnConfigOptions.java   |  49 +++
 .../AbstractYarnNonHaServices.java              | 105 +++++
 .../YarnHighAvailabilityServices.java           | 343 +++++++++++++++++
 .../YarnIntraNonHaMasterServices.java           | 188 +++++++++
 .../YarnPreConfiguredMasterNonHaServices.java   | 172 +++++++++
 .../YarnIntraNonHaMasterServicesTest.java       | 149 +++++++
 .../YarnPreConfiguredMasterHaServicesTest.java  | 234 +++++++++++
 33 files changed, 2537 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index f15c669..8f23435 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -44,7 +44,7 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                implements IOReadableWritable, java.io.Serializable, Cloneable {
 
        private static final long serialVersionUID = 1L;
-       
+
        private static final byte TYPE_STRING = 0;
        private static final byte TYPE_INT = 1;
        private static final byte TYPE_LONG = 2;
@@ -52,14 +52,14 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
        private static final byte TYPE_FLOAT = 4;
        private static final byte TYPE_DOUBLE = 5;
        private static final byte TYPE_BYTES = 6;
-       
+
        /** The log object used for debugging. */
        private static final Logger LOG = 
LoggerFactory.getLogger(Configuration.class);
-       
+
 
        /** Stores the concrete key/value pairs of this configuration object. */
        protected final HashMap<String, Object> confData;
-       
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -639,12 +639,16 @@ public class Configuration extends 
ExecutionConfig.GlobalJobParameters
                Object o = getRawValue(configOption.key());
 
                if (o != null) {
+                       // found a value for the current proper key
                        return o;
                }
                else if (configOption.hasDeprecatedKeys()) {
+                       // try the deprecated keys
                        for (String deprecatedKey : 
configOption.deprecatedKeys()) {
                                Object oo = getRawValue(deprecatedKey);
                                if (oo != null) {
+                                       LOG.warn("Config uses deprecated 
configuration key '{}' instead of proper key '{}'", 
+                                                       deprecatedKey, 
configOption.key());
                                        return oo;
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..40d75e8
--- /dev/null
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+       private static MiniDFSCluster HDFS_CLUSTER;
+
+       private static Path HDFS_ROOT_PATH;
+
+       // 
------------------------------------------------------------------------
+       //  startup / shutdown
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void createHDFS() throws Exception {
+               final File tempDir = TEMP_DIR.newFolder();
+
+               Configuration hdConf = new Configuration();
+               hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tempDir.getAbsolutePath());
+
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+               HDFS_CLUSTER = builder.build();
+
+               HDFS_ROOT_PATH = new Path("hdfs://" + 
HDFS_CLUSTER.getURI().getHost() + ":"
+                               + HDFS_CLUSTER.getNameNodePort() + "/");
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               if (HDFS_CLUSTER != null) {
+                       HDFS_CLUSTER.shutdown();
+               }
+               HDFS_CLUSTER = null;
+               HDFS_ROOT_PATH = null;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+       
+       @Test
+       public void testCreateAndSetFinished() throws Exception {
+               final Path workDir = new Path(HDFS_ROOT_PATH, "test-work-dir");
+               final JobID jid = new JobID();
+
+               FsNegativeRunningJobsRegistry registry = new 
FsNegativeRunningJobsRegistry(workDir);
+
+               // initially, without any call, the job is considered running
+               assertTrue(registry.isJobRunning(jid));
+
+               // repeated setting should not affect the status
+               registry.setJobRunning(jid);
+               assertTrue(registry.isJobRunning(jid));
+
+               // set the job to finished and validate
+               registry.setJobFinished(jid);
+               assertFalse(registry.isJobRunning(jid));
+
+               // another registry should pick this up
+               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(workDir);
+               assertFalse(otherRegistry.isJobRunning(jid));
+       }
+
+       @Test
+       public void testSetFinishedAndRunning() throws Exception {
+               final Path workDir = new Path(HDFS_ROOT_PATH, 
"änother_wörk_directörü");
+               final JobID jid = new JobID();
+
+               FsNegativeRunningJobsRegistry registry = new 
FsNegativeRunningJobsRegistry(workDir);
+
+               // set the job to finished and validate
+               registry.setJobFinished(jid);
+               assertFalse(registry.isJobRunning(jid));
+
+               // set the job to back to running and validate
+               registry.setJobRunning(jid);
+               assertTrue(registry.isJobRunning(jid));
+
+               // another registry should pick this up
+               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(workDir);
+               assertTrue(otherRegistry.isJobRunning(jid));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 0eab032..36dfa55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -35,6 +35,8 @@ import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.UnknownHostException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Concrete implementation of the {@link FileSystem} base class for the Hadoop 
File System. The
  * class is a wrapper class which encapsulated the original Hadoop HDFS API.
@@ -60,7 +62,8 @@ public final class HadoopFileSystem extends FileSystem 
implements HadoopFileSyst
 
 
        /**
-        * Creates a new DistributedFileSystem object to access HDFS
+        * Creates a new DistributedFileSystem object to access HDFS, based on 
a class name
+        * and picking up the configuration from the class path or the Flink 
configuration.
         * 
         * @throws IOException
         *         throw if the required HDFS classes cannot be instantiated
@@ -76,6 +79,21 @@ public final class HadoopFileSystem extends FileSystem 
implements HadoopFileSyst
                this.fs = instantiateFileSystem(fsClass);
        }
 
+       /**
+        * Creates a new DistributedFileSystem that uses the given Hadoop
+        * {@link org.apache.hadoop.fs.FileSystem} under the hood.
+        *
+        * @param hadoopConfig The Hadoop configuration that the FileSystem is 
based on.
+        * @param hadoopFileSystem The Hadoop FileSystem that will be used 
under the hood.
+        */
+       public HadoopFileSystem(
+                       org.apache.hadoop.conf.Configuration hadoopConfig,
+                       org.apache.hadoop.fs.FileSystem hadoopFileSystem) {
+
+               this.conf = checkNotNull(hadoopConfig, "hadoopConfig");
+               this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+       }
+
        private Class<? extends org.apache.hadoop.fs.FileSystem> 
getDefaultHDFSClass() throws IOException {
                Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index b91cec1..a417599 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,6 +43,12 @@ public class EmbeddedNonHaServices extends 
AbstractNonHaServices implements High
        // 
------------------------------------------------------------------------
 
        @Override
+       public String getResourceManagerEndpointName() {
+               // dynamic actor name
+               return null;
+       }
+
+       @Override
        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
                return 
resourceManagerLeaderService.createLeaderRetrievalService();
        }
@@ -55,11 +61,16 @@ public class EmbeddedNonHaServices extends 
AbstractNonHaServices implements High
        // 
------------------------------------------------------------------------
 
        @Override
-       public void shutdown() throws Exception {
+       public void close() throws Exception {
                try {
-                       super.shutdown();
+                       super.close();
                } finally {
                        resourceManagerLeaderService.shutdown();
                }
        }
+
+       @Override
+       public void closeAndCleanupAllData() throws Exception {
+               close();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
new file mode 100644
index 0000000..9d8b226
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This {@link RunningJobsRegistry} tracks the status jobs via marker files,
+ * marking finished jobs via marker files.
+ * 
+ * <p>The general contract is the following:
+ * <ul>
+ *     <li>Initially, a marker file does not exist (no one created it, yet), 
which means
+ *         the specific job is assumed to be running</li>
+ *     <li>The JobManager that finishes calls this service to create the 
marker file,
+ *         which marks the job as finished.</li>
+ *     <li>If a JobManager gains leadership at some point when shutdown is in 
progress,
+ *         it will see the marker file and realize that the job is 
finished.</li>
+ *     <li>The application framework is expected to clean the file once the 
application
+ *         is completely shut down. At that point, no JobManager will attempt 
to
+ *         start the job, even if it gains leadership.</li>
+ * </ul>
+ * 
+ * <p>It is especially tailored towards deployment modes like for example
+ * YARN, where HDFS is available as a persistent file system, and the YARN
+ * application's working directories on HDFS are automatically cleaned
+ * up after the application completed. 
+ */
+public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry {
+
+       private static final String PREFIX = ".job_complete_";
+
+       private final FileSystem fileSystem;
+
+       private final Path basePath;
+
+       /**
+        * Creates a new registry that writes to the FileSystem and working 
directory
+        * denoted by the given path.
+        * 
+        * <p>The initialization will attempt to write to the given working 
directory, in
+        * order to catch setup/configuration errors early.
+        *
+        * @param workingDirectory The working directory for files to track the 
job status.
+        *
+        * @throws IOException Thrown, if the specified directory cannot be 
accessed.
+        */
+       public FsNegativeRunningJobsRegistry(Path workingDirectory) throws 
IOException {
+               this(workingDirectory.getFileSystem(), workingDirectory);
+       }
+
+       /**
+        * Creates a new registry that writes its files to the given FileSystem 
at
+        * the given working directory path.
+        * 
+        * <p>The initialization will attempt to write to the given working 
directory, in
+        * order to catch setup/configuration errors early.
+        *
+        * @param fileSystem The FileSystem to use for the marker files.
+        * @param workingDirectory The working directory for files to track the 
job status.
+        *
+        * @throws IOException Thrown, if the specified directory cannot be 
accessed.
+        */
+       public FsNegativeRunningJobsRegistry(FileSystem fileSystem, Path 
workingDirectory) throws IOException {
+               this.fileSystem = checkNotNull(fileSystem, "fileSystem");
+               this.basePath = checkNotNull(workingDirectory, 
"workingDirectory");
+
+               // to be safe, attempt to write to the working directory, to
+               // catch problems early
+               final Path testFile = new Path(workingDirectory, 
".registry_test");
+               try (FSDataOutputStream out = fileSystem.create(testFile, 
false)) {
+                       out.write(42);
+               }
+               catch (IOException e) {
+                       throw new IOException("Unable to write to working 
directory: " + workingDirectory, e);
+               }
+               finally {
+                       fileSystem.delete(testFile, false);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void setJobRunning(JobID jobID) throws IOException {
+               checkNotNull(jobID, "jobID");
+               final Path filePath = createMarkerFilePath(jobID);
+
+               // delete the marker file, if it exists
+               try {
+                       fileSystem.delete(filePath, false);
+               }
+               catch (FileNotFoundException e) {
+                       // apparently job was already considered running
+               }
+       }
+
+       @Override
+       public void setJobFinished(JobID jobID) throws IOException {
+               checkNotNull(jobID, "jobID");
+               final Path filePath = createMarkerFilePath(jobID);
+
+               // create the file
+               // to avoid an exception if the job already exists, set 
overwrite=true
+               try (FSDataOutputStream out = fileSystem.create(filePath, 
true)) {
+                       out.write(42);
+               }
+       }
+
+       @Override
+       public boolean isJobRunning(JobID jobID) throws IOException {
+               checkNotNull(jobID, "jobID");
+
+               // check for the existence of the file
+               try {
+                       fileSystem.getFileStatus(createMarkerFilePath(jobID));
+                       // file was found --> job is terminated
+                       return false;
+               }
+               catch (FileNotFoundException e) {
+                       // file does not exist, job is still running
+                       return true;
+               }
+       }
+
+       private Path createMarkerFilePath(JobID jobId) {
+               return new Path(basePath, PREFIX + jobId.toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 360de7b..4169204 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -26,19 +26,45 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 import java.io.IOException;
+import java.util.UUID;
 
 /**
- * This class gives access to all services needed for
- *
+ * The HighAvailabilityServices give access to all services needed for a 
highly-available
+ * setup. In particular, the services provide access to highly available 
storage and
+ * registries, as well as distributed counters and leader election.
+ * 
  * <ul>
  *     <li>ResourceManager leader election and leader retrieval</li>
  *     <li>JobManager leader election and leader retrieval</li>
  *     <li>Persistence for checkpoint metadata</li>
  *     <li>Registering the latest completed checkpoint(s)</li>
- *     <li>Persistence for submitted job graph</li>
+ *     <li>Persistence for the BLOB store</li>
+ *     <li>Registry that marks a job's status</li>
+ *     <li>Naming of RPC endpoints</li>
  * </ul>
  */
-public interface HighAvailabilityServices {
+public interface HighAvailabilityServices extends AutoCloseable {
+
+       // 
------------------------------------------------------------------------
+       //  Constants
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This UUID should be used when no proper leader election happens, but 
a simple
+        * pre-configured leader is used. That is for example the case in 
non-highly-available
+        * standalone setups.
+        */
+       UUID DEFAULT_LEADER_ID = new UUID(0, 0);
+
+       // 
------------------------------------------------------------------------
+       //  Endpoint Naming
+       // 
------------------------------------------------------------------------
+
+       String getResourceManagerEndpointName();
+
+       // 
------------------------------------------------------------------------
+       //  Services
+       // 
------------------------------------------------------------------------
 
        /**
         * Gets the leader retriever for the cluster's resource manager.
@@ -88,7 +114,7 @@ public interface HighAvailabilityServices {
         *
         * @return Running job registry to retrieve running jobs
         */
-       RunningJobsRegistry getRunningJobsRegistry();
+       RunningJobsRegistry getRunningJobsRegistry() throws Exception;
 
        /**
         * Creates the BLOB store in which BLOBs are stored in a 
highly-available fashion.
@@ -99,11 +125,38 @@ public interface HighAvailabilityServices {
        BlobStore createBlobStore() throws IOException;
 
        // 
------------------------------------------------------------------------
+       //  Shutdown and Cleanup
+       // 
------------------------------------------------------------------------
 
        /**
-        * Shut the high availability service down.
+        * Closes the high availability services, releasing all resources.
+        * 
+        * <p>This method <b>does not delete or clean up</b> any data stored in 
external stores
+        * (file systems, ZooKeeper, etc). Another instance of the high 
availability
+        * services will be able to recover the job.
+        * 
+        * <p>If an exception occurs during closing services, this method will 
attempt to
+        * continue closing other services and report exceptions only after all 
services
+        * have been attempted to be closed.
         *
-        * @throws Exception if the shut down fails
+        * @throws Exception Thrown, if an exception occurred while closing 
these services.
+        */
+       @Override
+       void close() throws Exception;
+
+       /**
+        * Closes the high availability services (releasing all resources) and 
deletes
+        * all data stored by these services in external stores.
+        * 
+        * <p>After this method was called, the any job or session that was 
managed by
+        * these high availability services will be unrecoverable.
+        * 
+        * <p>If an exception occurs during cleanup, this method will attempt to
+        * continue the cleanup and report exceptions only after all cleanup 
steps have
+        * been attempted.
+        * 
+        * @throws Exception Thrown, if an exception occurred while closing 
these services
+        *                   or cleaning up data stored by them.
         */
-       void shutdown() throws Exception;
+       void closeAndCleanupAllData() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 75f44ed..d644fb9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import 
org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
 import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
-import java.util.UUID;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -39,6 +37,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class NonHaServices extends AbstractNonHaServices implements 
HighAvailabilityServices {
 
+       /** The constant name of the ResourceManager RPC endpoint */
+       private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = 
"resource_manager";
+
        /** The fix address of the ResourceManager */
        private final String resourceManagerAddress;
 
@@ -53,16 +54,26 @@ public class NonHaServices extends AbstractNonHaServices 
implements HighAvailabi
        }
 
        // 
------------------------------------------------------------------------
+       //  Names
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String getResourceManagerEndpointName() {
+               return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+       }
+
+
+       // 
------------------------------------------------------------------------
        //  Services
        // 
------------------------------------------------------------------------
 
        @Override
        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-               return new 
StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
+               return new 
StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
        }
 
        @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() {
-               return new StandaloneLeaderElectionService();
+               return new SingleLeaderElectionService(getExecutorService(), 
DEFAULT_LEADER_ID);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
new file mode 100644
index 0000000..24667e4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ServicesThreadFactory implements ThreadFactory {
+
+       private AtomicInteger enumerator = new AtomicInteger();
+
+       @Override
+       public Thread newThread(@Nonnull Runnable r) {
+               Thread thread = new Thread(r, "Flink HA Services Thread #" + 
enumerator.incrementAndGet());
+
+               // HA threads should have a very high priority, but not
+               // keep the JVM running by themselves
+               thread.setPriority(Thread.MAX_PRIORITY);
+               thread.setDaemon(true);
+
+               return thread;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 3e909e8..25d21ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -108,6 +108,16 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        // 
------------------------------------------------------------------------
 
        @Override
+       public String getResourceManagerEndpointName() {
+               // since the resource manager name must be dynamic, we return 
null here
+               return null;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Services
+       // 
------------------------------------------------------------------------
+
+       @Override
        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
                return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
        }
@@ -174,10 +184,15 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void shutdown() throws Exception {
+       public void close() throws Exception {
                client.close();
        }
 
+       @Override
+       public void closeAndCleanupAllData() throws Exception {
+               close();
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
new file mode 100644
index 0000000..26e3cbf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import java.util.HashSet;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of the {@link LeaderElectionService} interface that 
handles a single
+ * leader contender. When started, this service immediately grants the 
contender the leadership.
+ * 
+ * <p>The implementation accepts a single static leader session ID and is 
hence compatible with
+ * pre-configured single leader (no leader failover) setups.
+ * 
+ * <p>This implementation supports a series of leader listeners that receive 
notifications about
+ * the leader contender.
+ */
+public class SingleLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SingleLeaderElectionService.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** lock for all operations on this instance */
+       private final Object lock = new Object();
+
+       /** The executor service that dispatches notifications */
+       private final Executor notificationExecutor;
+
+       /** The leader ID assigned to the immediate leader */
+       private final UUID leaderId;
+
+       @GuardedBy("lock")
+       private final HashSet<EmbeddedLeaderRetrievalService> listeners;
+
+       /** The currently proposed leader */
+       @GuardedBy("lock")
+       private volatile LeaderContender proposedLeader;
+
+       /** The confirmed leader */
+       @GuardedBy("lock")
+       private volatile LeaderContender leader;
+
+       /** The address of the confirmed leader */
+       @GuardedBy("lock")
+       private volatile String leaderAddress;
+
+       /** Flag marking this service as shutdown, meaning it cannot be started 
again */
+       @GuardedBy("lock")
+       private volatile boolean shutdown;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new leader election service. The service assigns the given 
leader ID
+        * to the leader contender.
+        * 
+        * @param leaderId The constant leader ID assigned to the leader.
+        */
+       public SingleLeaderElectionService(Executor notificationsDispatcher, 
UUID leaderId) {
+               this.notificationExecutor = 
checkNotNull(notificationsDispatcher);
+               this.leaderId = checkNotNull(leaderId);
+               this.listeners = new HashSet<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  leader election service
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void start(LeaderContender contender) throws Exception {
+               checkNotNull(contender, "contender");
+
+               synchronized (lock) {
+                       checkState(!shutdown, "service is shut down");
+                       checkState(proposedLeader == null, "service already 
started");
+
+                       // directly grant leadership to the given contender
+                       proposedLeader = contender;
+                       notificationExecutor.execute(new 
GrantLeadershipCall(contender, leaderId));
+               }
+       }
+
+       @Override
+       public void stop() {
+               synchronized (lock) {
+                       // notify all listeners that there is no leader
+                       for (EmbeddedLeaderRetrievalService listener : 
listeners) {
+                               notificationExecutor.execute(
+                                               new NotifyOfLeaderCall(null, 
null, listener.listener, LOG));
+                       }
+
+                       // if there was a leader, revoke its leadership
+                       if (leader != null) {
+                               try {
+                                       leader.revokeLeadership();
+                               } catch (Throwable t) {
+                                       leader.handleError(t instanceof 
Exception ? (Exception) t : new Exception(t));
+                               }
+                       }
+
+                       proposedLeader = null;
+                       leader = null;
+                       leaderAddress = null;
+               }
+       }
+
+       @Override
+       public void confirmLeaderSessionID(UUID leaderSessionID) {
+               checkNotNull(leaderSessionID, "leaderSessionID");
+               checkArgument(leaderSessionID.equals(leaderId), "confirmed 
wrong leader session id");
+
+               synchronized (lock) {
+                       checkState(!shutdown, "service is shut down");
+                       checkState(proposedLeader != null, "no leader proposed 
yet");
+                       checkState(leader == null, "leader already confirmed");
+
+                       // accept the confirmation
+                       final String address = proposedLeader.getAddress();
+                       leaderAddress = address;
+                       leader = proposedLeader;
+
+                       // notify all listeners
+                       for (EmbeddedLeaderRetrievalService listener : 
listeners) {
+                               notificationExecutor.execute(
+                                               new NotifyOfLeaderCall(address, 
leaderId, listener.listener, LOG));
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership() {
+               synchronized (lock) {
+                       return leader != null;
+               }
+       }
+
+       void errorOnGrantLeadership(LeaderContender contender, Throwable error) 
{
+               LOG.warn("Error notifying leader listener about new leader", 
error);
+               contender.handleError(error instanceof Exception ? (Exception) 
error : new Exception(error));
+               
+               synchronized (lock) {
+                       if (proposedLeader == contender) {
+                               proposedLeader = null;
+                               leader = null;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  shutdown
+       // 
------------------------------------------------------------------------
+
+       public boolean isShutdown() {
+               return shutdown;
+       }
+
+       public void shutdown() {
+               shutdownInternally(new Exception("The leader service is 
shutting down"));
+       }
+
+       private void shutdownInternally(Exception exceptionForHandlers) {
+               synchronized (lock) {
+                       if (shutdown) {
+                               return;
+                       }
+
+                       shutdown = true;
+
+                       // fail the leader (if there is one)
+                       if (leader != null) {
+                               try {
+                                       
leader.handleError(exceptionForHandlers);
+                               } catch (Throwable ignored) {}
+                       }
+
+                       // clear all leader status
+                       leader = null;
+                       proposedLeader = null;
+                       leaderAddress = null;
+
+                       // fail all registered listeners
+                       for (EmbeddedLeaderRetrievalService service : 
listeners) {
+                               service.shutdown(exceptionForHandlers);
+                       }
+                       listeners.clear();
+               }
+       }
+
+       private void fatalError(Throwable error) {
+               LOG.error("Embedded leader election service encountered a fatal 
error. Shutting down service.", error);
+
+               shutdownInternally(new Exception("Leader election service is 
shutting down after a fatal error", error));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  leader listeners
+       // 
------------------------------------------------------------------------
+
+       public LeaderRetrievalService createLeaderRetrievalService() {
+               checkState(!shutdown, "leader election service is shut down");
+               return new EmbeddedLeaderRetrievalService();
+       }
+
+       void addListener(EmbeddedLeaderRetrievalService service, 
LeaderRetrievalListener listener) {
+               synchronized (lock) {
+                       checkState(!shutdown, "leader election service is shut 
down");
+                       checkState(!service.running, "leader retrieval service 
is already started");
+
+                       try {
+                               if (!listeners.add(service)) {
+                                       throw new IllegalStateException("leader 
retrieval service was added to this service multiple times");
+                               }
+
+                               service.listener = listener;
+                               service.running = true;
+
+                               // if we already have a leader, immediately 
notify this new listener
+                               if (leader != null) {
+                                       notificationExecutor.execute(
+                                                       new 
NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG));
+                               }
+                       }
+                       catch (Throwable t) {
+                               fatalError(t);
+                       }
+               }
+       }
+
+       void removeListener(EmbeddedLeaderRetrievalService service) {
+               synchronized (lock) {
+                       // if the service was not even started, simply do 
nothing
+                       if (!service.running || shutdown) {
+                               return;
+                       }
+
+                       try {
+                               if (!listeners.remove(service)) {
+                                       throw new IllegalStateException("leader 
retrieval service does not belong to this service");
+                               }
+
+                               // stop the service
+                               service.listener = null;
+                               service.running = false;
+                       }
+                       catch (Throwable t) {
+                               fatalError(t);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private class EmbeddedLeaderRetrievalService implements 
LeaderRetrievalService {
+
+               volatile LeaderRetrievalListener listener;
+
+               volatile boolean running;
+
+               @Override
+               public void start(LeaderRetrievalListener listener) throws 
Exception {
+                       checkNotNull(listener);
+                       addListener(this, listener);
+               }
+
+               @Override
+               public void stop() throws Exception {
+                       removeListener(this);
+               }
+
+               void shutdown(Exception cause) {
+                       if (running) {
+                               final LeaderRetrievalListener lst = listener;
+                               running = false;
+                               listener = null;
+
+                               try {
+                                       lst.handleError(cause);
+                               } catch (Throwable ignored) {}
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  asynchronous notifications
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This runnable informs a leader contender that it gained leadership.
+        */
+       private class GrantLeadershipCall implements Runnable {
+
+               private final LeaderContender contender;
+               private final UUID leaderSessionId;
+
+               GrantLeadershipCall(LeaderContender contender, UUID 
leaderSessionId) {
+
+                       this.contender = checkNotNull(contender);
+                       this.leaderSessionId = checkNotNull(leaderSessionId);
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               contender.grantLeadership(leaderSessionId);
+                       }
+                       catch (Throwable t) {
+                               errorOnGrantLeadership(contender, t);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This runnable informs a leader listener of a new leader
+        */
+       private static class NotifyOfLeaderCall implements Runnable {
+
+               @Nullable
+               private final String address;       // null if leader revoked 
without new leader
+               @Nullable
+               private final UUID leaderSessionId; // null if leader revoked 
without new leader
+
+               private final LeaderRetrievalListener listener;
+               private final Logger logger;
+
+               NotifyOfLeaderCall(
+                               @Nullable String address,
+                               @Nullable UUID leaderSessionId,
+                               LeaderRetrievalListener listener,
+                               Logger logger) {
+
+                       this.address = address;
+                       this.leaderSessionId = leaderSessionId;
+                       this.listener = checkNotNull(listener);
+                       this.logger = checkNotNull(logger);
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               listener.notifyLeaderAddress(address, 
leaderSessionId);
+                       }
+                       catch (Throwable t) {
+                               logger.warn("Error notifying leader listener 
about new leader", t);
+                               listener.handleError(t instanceof Exception ? 
(Exception) t : new Exception(t));
+                       }
+               }
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 474faa8..b10e414 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -25,19 +25,17 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
-import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -132,7 +130,7 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
        // 
------------------------------------------------------------------------
 
        @Override
-       public void shutdown() throws Exception {
+       public void close() throws Exception {
                synchronized (lock) {
                        if (!shutdown) {
                                shutdown = true;
@@ -149,6 +147,12 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
                }
        }
 
+       @Override
+       public void closeAndCleanupAllData() throws Exception {
+               // this stores no data, so this method is the same as 'close()'
+               close();
+       }
+
        private void checkNotShutdown() {
                checkState(!shutdown, "high availability services are shut 
down");
        }
@@ -160,21 +164,4 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
        protected ExecutorService getExecutorService() {
                return executor;
        }
-
-       private static final class ServicesThreadFactory implements 
ThreadFactory {
-
-               private AtomicInteger enumerator = new AtomicInteger();
-
-               @Override
-               public Thread newThread(@Nonnull Runnable r) {
-                       Thread thread = new Thread(r, "Flink HA Services Thread 
#" + enumerator.incrementAndGet());
-
-                       // HA threads should have a very high priority, but not
-                       // keep the JVM running by themselves
-                       thread.setPriority(Thread.MAX_PRIORITY);
-                       thread.setDaemon(true);
-
-                       return thread;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
index 9fad9be..d4eba26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java
@@ -461,7 +461,7 @@ public class EmbeddedLeaderService {
                                contender.grantLeadership(leaderSessionId);
                        }
                        catch (Throwable t) {
-                               logger.warn("Error notifying leader listener 
about new leader", t);
+                               logger.warn("Error granting leadership to 
contender", t);
                                contender.handleError(t instanceof Exception ? 
(Exception) t : new Exception(t));
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index a0c608d..269a8f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index 16b163c..4ad4646 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -51,6 +51,7 @@ public class StandaloneLeaderRetrievalService implements 
LeaderRetrievalService
         *
         * @param leaderAddress The leader's pre-configured address
         */
+       @Deprecated
        public StandaloneLeaderRetrievalService(String leaderAddress) {
                this.leaderAddress = checkNotNull(leaderAddress);
                this.leaderId = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 29a6e59..1933554 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -345,7 +345,7 @@ public class MiniCluster {
                // shut down high-availability services
                if (haServices != null) {
                        try {
-                               haServices.shutdown();
+                               haServices.closeAndCleanupAllData();
                        } catch (Exception e) {
                                exception = firstOrSuppressed(e, exception);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 56e72c0..6c7e249 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -59,7 +59,7 @@ public class JobLeaderIdService {
        /** Actions to call when the job leader changes */
        private JobLeaderIdActions jobLeaderIdActions;
 
-       public JobLeaderIdService(HighAvailabilityServices 
highAvailabilityServices) {
+       public JobLeaderIdService(HighAvailabilityServices 
highAvailabilityServices) throws Exception {
                this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityServices);
 
                this.runningJobsRegistry = 
highAvailabilityServices.getRunningJobsRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 959b727..e0dee0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -46,7 +45,7 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
                        final Configuration configuration,
                        final RpcService rpcService,
                        final HighAvailabilityServices highAvailabilityServices,
-                       final MetricRegistry metricRegistry) throws 
ConfigurationException {
+                       final MetricRegistry metricRegistry) throws Exception {
 
                Preconditions.checkNotNull(configuration);
                Preconditions.checkNotNull(rpcService);

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
index 1ac54ac..018c3ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java
@@ -21,19 +21,36 @@ package org.apache.flink.runtime.rpc;
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.NetUtils;
+
 import org.jboss.netty.channel.ChannelException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * These RPC utilities contain helper methods around RPC use, such as starting 
an RPC service,
+ * or constructing RPC addresses.
+ */
 public class RpcServiceUtils {
+
        private static final Logger LOG = 
LoggerFactory.getLogger(RpcServiceUtils.class);
 
+       // 
------------------------------------------------------------------------
+       //  RPC instantiation
+       // 
------------------------------------------------------------------------
+
        /**
         * Utility method to create RPC service from configuration and 
hostname, port.
         *
@@ -78,4 +95,57 @@ public class RpcServiceUtils {
                final Time timeout = 
Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis());
                return new AkkaRpcService(actorSystem, timeout);
        }
+
+       // 
------------------------------------------------------------------------
+       //  RPC endpoint addressing
+       // 
------------------------------------------------------------------------
+
+       /**
+        *
+        * @param hostname     The hostname or address where the target RPC 
service is listening.
+        * @param port         The port where the target RPC service is 
listening.
+        * @param endpointName The name of the RPC endpoint.
+        * @param config       Teh configuration from which to deduce further 
settings.
+        *
+        * @return The RPC URL of the specified RPC endpoint.
+        */
+       public static String getRpcUrl(String hostname, int port, String 
endpointName, Configuration config)
+                       throws UnknownHostException {
+
+               checkNotNull(config, "config is null");
+
+               final boolean sslEnabled = config.getBoolean(
+                                       ConfigConstants.AKKA_SSL_ENABLED,
+                                       
ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) &&
+                               SSLUtils.getSSLEnabled(config);
+
+               return getRpcUrl(hostname, port, endpointName, sslEnabled);
+       }
+
+       /**
+        * 
+        * @param hostname     The hostname or address where the target RPC 
service is listening.
+        * @param port         The port where the target RPC service is 
listening.
+        * @param endpointName The name of the RPC endpoint.
+        * @param secure       True, if security/encryption is enabled, false 
otherwise.
+        * 
+        * @return The RPC URL of the specified RPC endpoint.
+        */
+       public static String getRpcUrl(String hostname, int port, String 
endpointName, boolean secure)
+                       throws UnknownHostException {
+
+               checkNotNull(hostname, "hostname is null");
+               checkNotNull(endpointName, "endpointName is null");
+               checkArgument(port > 0 && port <= 65535, "port must be in [1, 
65535]");
+
+               final String protocol = secure ? "akka.ssl.tcp" : "akka.tcp";
+               final String hostPort = 
NetUtils.hostAndPortToUrlString(hostname, port);
+
+               return String.format("%s://flink@%s/user/%s", protocol, 
hostPort, endpointName);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** This class is not meant to be instantiated */
+       private RpcServiceUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
new file mode 100644
index 0000000..f1ece0e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FsNegativeRunningJobsRegistryTest extends TestLogger {
+
+       @Rule
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testCreateAndSetFinished() throws Exception {
+               final File folder = tempFolder.newFolder();
+               final String uri = folder.toURI().toString();
+
+               final JobID jid = new JobID();
+
+               FsNegativeRunningJobsRegistry registry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
+
+               // initially, without any call, the job is considered running
+               assertTrue(registry.isJobRunning(jid));
+
+               // repeated setting should not affect the status
+               registry.setJobRunning(jid);
+               assertTrue(registry.isJobRunning(jid));
+
+               // set the job to finished and validate
+               registry.setJobFinished(jid);
+               assertFalse(registry.isJobRunning(jid));
+
+               // another registry should pick this up
+               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
+               assertFalse(otherRegistry.isJobRunning(jid));
+       }
+
+       @Test
+       public void testSetFinishedAndRunning() throws Exception {
+               final File folder = tempFolder.newFolder();
+               final String uri = folder.toURI().toString();
+
+               final JobID jid = new JobID();
+
+               FsNegativeRunningJobsRegistry registry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
+
+               // set the job to finished and validate
+               registry.setJobFinished(jid);
+               assertFalse(registry.isJobRunning(jid));
+
+               // set the job to back to running and validate
+               registry.setJobRunning(jid);
+               assertTrue(registry.isJobRunning(jid));
+
+               // another registry should pick this up
+               FsNegativeRunningJobsRegistry otherRegistry = new 
FsNegativeRunningJobsRegistry(new Path(uri));
+               assertTrue(otherRegistry.isJobRunning(jid));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index e0f71ee..3f9865c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -155,12 +155,22 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                return new VoidBlobStore();
        }
 
+       @Override
+       public String getResourceManagerEndpointName() {
+               throw new UnsupportedOperationException();
+       }
+
        // 
------------------------------------------------------------------------
        //  Shutdown
        // 
------------------------------------------------------------------------
 
        @Override
-       public void shutdown() throws Exception {
-               // nothing to do, since this should not shut down individual 
services, but cross service parts
+       public void close() throws Exception {
+               // nothing to do
+       }
+
+       @Override
+       public void closeAndCleanupAllData() throws Exception {
+               // nothing to do
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
new file mode 100644
index 0000000..a9805a1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.leaderelection;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the {@link SingleLeaderElectionService}.
+ */
+public class SingleLeaderElectionServiceTest {
+
+       private static final Random RND = new Random();
+
+       private final Executor executor = Executors.directExecutor();
+
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testStartStopAssignLeadership() throws Exception {
+               final UUID uuid = UUID.randomUUID();
+               final SingleLeaderElectionService service = new 
SingleLeaderElectionService(executor, uuid);
+
+               final LeaderContender contender = mockContender(service);
+               final LeaderContender otherContender = mockContender(service);
+
+               service.start(contender);
+               verify(contender, times(1)).grantLeadership(uuid);
+
+               service.stop();
+               verify(contender, times(1)).revokeLeadership();
+
+               // start with a new contender - the old contender must not gain 
another leadership
+               service.start(otherContender);
+               verify(otherContender, times(1)).grantLeadership(uuid);
+
+               verify(contender, times(1)).grantLeadership(uuid);
+               verify(contender, times(1)).revokeLeadership();
+       }
+
+       @Test
+       public void testStopBeforeConfirmingLeadership() throws Exception {
+               final UUID uuid = UUID.randomUUID();
+               final SingleLeaderElectionService service = new 
SingleLeaderElectionService(executor, uuid);
+
+               final LeaderContender contender = mock(LeaderContender.class);
+
+               service.start(contender);
+               verify(contender, times(1)).grantLeadership(uuid);
+
+               service.stop();
+
+               // because the leadership was never confirmed, there is no 
"revoke" call
+               verifyNoMoreInteractions(contender);
+       }
+
+       @Test
+       public void testStartOnlyOnce() throws Exception {
+               final UUID uuid = UUID.randomUUID();
+               final SingleLeaderElectionService service = new 
SingleLeaderElectionService(executor, uuid);
+
+               final LeaderContender contender = mock(LeaderContender.class);
+               final LeaderContender otherContender = 
mock(LeaderContender.class);
+
+               service.start(contender);
+               verify(contender, times(1)).grantLeadership(uuid);
+
+               // should not be possible to start again this with another 
contender
+               try {
+                       service.start(otherContender);
+                       fail("should fail with an exception");
+               } catch (IllegalStateException e) {
+                       // expected
+               }
+
+               // should not be possible to start this again with the same 
contender
+               try {
+                       service.start(contender);
+                       fail("should fail with an exception");
+               } catch (IllegalStateException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testShutdown() throws Exception {
+               final UUID uuid = UUID.randomUUID();
+               final SingleLeaderElectionService service = new 
SingleLeaderElectionService(executor, uuid);
+
+               // create a leader contender and let it grab leadership
+               final LeaderContender contender = mockContender(service);
+               service.start(contender);
+               verify(contender, times(1)).grantLeadership(uuid);
+
+               // some leader listeners
+               final LeaderRetrievalListener listener1 = 
mock(LeaderRetrievalListener.class);
+               final LeaderRetrievalListener listener2 = 
mock(LeaderRetrievalListener.class);
+
+               LeaderRetrievalService listenerService1 = 
service.createLeaderRetrievalService();
+               LeaderRetrievalService listenerService2 = 
service.createLeaderRetrievalService();
+
+               listenerService1.start(listener1);
+               listenerService2.start(listener2);
+
+               // one listener stops
+               listenerService1.stop();
+
+               // shut down the service
+               service.shutdown();
+
+               // the leader contender and running listener should get error 
notifications
+               verify(contender, times(1)).handleError(any(Exception.class));
+               verify(listener2, times(1)).handleError(any(Exception.class));
+
+               // the stopped listener gets no notification
+               verify(listener1, times(0)).handleError(any(Exception.class));
+
+               // should not be possible to start again after shutdown
+               try {
+                       service.start(contender);
+                       fail("should fail with an exception");
+               } catch (IllegalStateException e) {
+                       // expected
+               }
+
+               // no additional leadership grant
+               verify(contender, times(1)).grantLeadership(any(UUID.class));
+       }
+
+       @Test
+       public void testImmediateShutdown() throws Exception {
+               final UUID uuid = UUID.randomUUID();
+               final SingleLeaderElectionService service = new 
SingleLeaderElectionService(executor, uuid);
+               service.shutdown();
+
+               final LeaderContender contender = mock(LeaderContender.class);
+               
+               // should not be possible to start
+               try {
+                       service.start(contender);
+                       fail("should fail with an exception");
+               } catch (IllegalStateException e) {
+                       // expected
+               }
+
+               // no additional leadership grant
+               verify(contender, times(0)).grantLeadership(any(UUID.class));
+       }
+
+//     @Test
+//     public void testNotifyListenersWhenLeaderElected() throws Exception {
+//             final UUID uuid = UUID.randomUUID();
+//             final SingleLeaderElectionService service = new 
SingleLeaderElectionService(executor, uuid);
+//
+//             final LeaderRetrievalListener listener1 = 
mock(LeaderRetrievalListener.class);
+//             final LeaderRetrievalListener listener2 = 
mock(LeaderRetrievalListener.class);
+//
+//             LeaderRetrievalService listenerService1 = 
service.createLeaderRetrievalService();
+//             LeaderRetrievalService listenerService2 = 
service.createLeaderRetrievalService();
+//
+//             listenerService1.start(listener1);
+//             listenerService1.start(listener2);
+//
+//             final LeaderContender contender = mockContender(service);
+//             service.start(contender);
+//
+//             veri
+//     }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private static LeaderContender mockContender(final 
LeaderElectionService service) {
+               String address = StringUtils.getRandomString(RND, 5, 10, 'a', 
'z');
+               return mockContender(service, address);
+       }
+
+       private static LeaderContender mockContender(final 
LeaderElectionService service, final String address) {
+               LeaderContender mockContender = mock(LeaderContender.class);
+
+               when(mockContender.getAddress()).thenReturn(address);
+
+               doAnswer(new Answer<Void>() {
+                               @Override
+                               public Void answer(InvocationOnMock invocation) 
throws Throwable {
+                                       final UUID uuid = (UUID) 
invocation.getArguments()[0];
+                                       service.confirmLeaderSessionID(uuid);
+                                       return null;
+                               }
+               }).when(mockContender).grantLeadership(any(UUID.class));
+
+               return mockContender;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml
index 18ffe13..f69e0e4 100644
--- a/flink-yarn/pom.xml
+++ b/flink-yarn/pom.xml
@@ -32,6 +32,9 @@ under the License.
        <packaging>jar</packaging>
 
        <dependencies>
+
+               <!-- core dependencies -->
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime_2.10</artifactId>
@@ -91,6 +94,8 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- test dependencies -->
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime_2.10</artifactId>
@@ -98,6 +103,22 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+                       <version>${hadoop.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+                       <version>${hadoop.version}</version>
+               </dependency>
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index 923694e..1c8bad7 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -23,17 +23,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /**
  * This class is the executable entry point for the YARN application master.
@@ -95,7 +97,18 @@ public abstract class 
AbstractYarnFlinkApplicationMasterRunner {
                        LOG.info("YARN daemon is running as: {} Yarn client 
user obtainer: {}",
                                        currentUser.getShortUserName(), 
yarnClientUsername );
 
-                       SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
+                       // Flink configuration
+                       final Map<String, String> dynamicProperties =
+                                       
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+                       LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
+
+                       final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
+                       if(keytabPath != null && remoteKeytabPrincipal != null) 
{
+                               
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
+                               
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
+                       }
+
+                       SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(flinkConfig);
 
                        //To support Yarn Secure Integration Test Scenario
                        File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
@@ -108,18 +121,7 @@ public abstract class 
AbstractYarnFlinkApplicationMasterRunner {
                                sc.setHadoopConfiguration(conf);
                        }
 
-                       // Flink configuration
-                       final Map<String, String> dynamicProperties =
-                                       
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
-                       LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
-
-                       final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
-                       if(keytabPath != null && remoteKeytabPrincipal != null) 
{
-                               
flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath);
-                               
flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
-                       }
-
-                       
SecurityContext.install(sc.setFlinkConfiguration(flinkConfig));
+                       SecurityUtils.install(sc);
 
                        // Note that we use the "appMasterHostname" given by 
YARN here, to make sure
                        // we use the hostnames given by YARN consistently 
throughout akka.
@@ -129,9 +131,9 @@ public abstract class 
AbstractYarnFlinkApplicationMasterRunner {
                                        "ApplicationMaster hostname variable %s 
not set", Environment.NM_HOST.key());
                        LOG.info("YARN assigned hostname for application 
master: {}", appMasterHostname);
 
-                       return SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                       return 
SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
                                @Override
-                               public Integer run() {
+                               public Integer call() throws Exception {
                                        return 
runApplicationMaster(flinkConfig);
                                }
                        });

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 1826d43..d1ef553 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -115,7 +115,7 @@ public class YarnApplicationMasterRunner {
         * @param args The command line arguments.
         */
        public static void main(String[] args) {
-               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / JobManager", args);
+               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / ResourceManager / JobManager", args);
                SignalHandler.register(LOG);
                JvmShutdownSafeguard.installAsShutdownHook(LOG);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index e58c77e..188d9ef 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -57,7 +56,12 @@ import java.io.FileInputStream;
 import java.io.ObjectInputStream;
 
 /**
- * This class is the executable entry point for the YARN application master.
+ * This class is the executable entry point for the YARN Application Master 
that
+ * executes a single Flink job and then shuts the YARN application down.
+ * 
+ * <p>The lifetime of the YARN application bound to that of the Flink job. 
Other
+ * YARN Application Master implementations are for example the YARN session.
+ * 
  * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobManagerRunner}
  * and {@link org.apache.flink.yarn.YarnResourceManager}.
  *
@@ -74,6 +78,8 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
        /** The job graph file path */
        private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
 
+       // 
------------------------------------------------------------------------
+
        /** The lock to guard startup / shutdown / manipulation methods */
        private final Object lock = new Object();
 
@@ -105,7 +111,7 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
         * @param args The command line arguments.
         */
        public static void main(String[] args) {
-               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster runner", args);
+               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / ResourceManager / JobManager", args);
                SignalHandler.register(LOG);
                JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
@@ -127,7 +133,9 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
                                        
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
 
                        synchronized (lock) {
+                               LOG.info("Starting High Availability Services");
                                haServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+                               
                                metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
                                commonRpcService = createRpcService(config, 
appMasterHostname, amPortRange);
 
@@ -176,7 +184,7 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
                return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
        }
 
-       private ResourceManager createResourceManager(Configuration config) 
throws ConfigurationException {
+       private ResourceManager<?> createResourceManager(Configuration config) 
throws Exception {
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(config);
                final SlotManagerFactory slotManagerFactory = new 
DefaultSlotManager.Factory();
                final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(haServices);
@@ -242,7 +250,7 @@ public class YarnFlinkApplicationMasterRunner extends 
AbstractYarnFlinkApplicati
                        }
                        if (haServices != null) {
                                try {
-                                       haServices.shutdown();
+                                       haServices.close();
                                } catch (Throwable tt) {
                                        LOG.warn("Failed to stop the HA 
service", tt);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index d9912eb..dc8c604 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.security.SecurityContext;
+import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /**
  * This class is the executable entry point for running a TaskExecutor in a 
YARN container.
@@ -138,7 +139,7 @@ public class YarnTaskExecutorRunner {
                        LOG.info("YARN daemon is running as: {} Yarn client 
user obtainer: {}",
                                        currentUser.getShortUserName(), 
yarnClientUsername);
 
-                       SecurityContext.SecurityConfiguration sc = new 
SecurityContext.SecurityConfiguration();
+                       SecurityUtils.SecurityConfiguration sc = new 
SecurityUtils.SecurityConfiguration(configuration);
 
                        //To support Yarn Secure Integration Test Scenario
                        File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
@@ -156,11 +157,11 @@ public class YarnTaskExecutorRunner {
                                
configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, 
remoteKeytabPrincipal);
                        }
 
-                       
SecurityContext.install(sc.setFlinkConfiguration(configuration));
+                       SecurityUtils.install(sc);
 
-                       return SecurityContext.getInstalled().runSecured(new 
SecurityContext.FlinkSecuredRunner<Integer>() {
+                       return 
SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
                                @Override
-                               public Integer run() {
+                               public Integer call() throws Exception {
                                        return runTaskExecutor(configuration);
                                }
                        });
@@ -240,7 +241,7 @@ public class YarnTaskExecutorRunner {
                        }
                        if (haServices != null) {
                                try {
-                                       haServices.shutdown();
+                                       haServices.close();
                                } catch (Throwable tt) {
                                        LOG.warn("Failed to stop the HA 
service", tt);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
new file mode 100644
index 0000000..c3902d3
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by Flink's YARN runners.
+ * These options are not expected to be ever configured by users explicitly. 
+ */
+public class YarnConfigOptions {
+
+       /**
+        * The hostname or address where the application master RPC system is 
listening.
+        */
+       public static final ConfigOption<String> APP_MASTER_RPC_ADDRESS =
+                       key("yarn.appmaster.rpc.address")
+                       .noDefaultValue();
+
+       /**
+        * The port where the application master RPC system is listening.
+        */
+       public static final ConfigOption<Integer> APP_MASTER_RPC_PORT =
+                       key("yarn.appmaster.rpc.address")
+                       .defaultValue(-1);
+
+       // 
------------------------------------------------------------------------
+
+       /** This class is not meant to be instantiated */
+       private YarnConfigOptions() {}
+}

Reply via email to