http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
new file mode 100644
index 0000000..7aa481f
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for the high availability services for Flink YARN 
applications that support
+ * no master fail over.
+ *
+ * <p>Internally, these services put their recovery data into YARN's working 
directory,
+ * except for checkpoints, which are in the configured checkpoint directory. 
That way,
+ * checkpoints can be resumed with a new job/application, even if the complete 
YARN application
+ * is killed and cleaned up.
+ */
+public abstract class AbstractYarnNonHaServices extends 
YarnHighAvailabilityServices {
+
+       /** The constant name of the ResourceManager RPC endpoint */
+       protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = 
"resource_manager";
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates new YARN high-availability services, configuring the file 
system and recovery
+        * data directory based on the working directory in the given Hadoop 
configuration.
+        *
+        * <p>This class requires that the default Hadoop file system 
configured in the given
+        * Hadoop configuration is an HDFS.
+        *
+        * @param config     The Flink configuration of this component / 
process.
+        * @param hadoopConf The Hadoop configuration for the YARN cluster.
+        *
+        * @throws IOException Thrown, if the initialization of the Hadoop file 
system used by YARN fails.
+        */
+       protected AbstractYarnNonHaServices(
+                       Configuration config,
+                       org.apache.hadoop.conf.Configuration hadoopConf) throws 
IOException {
+               super(config, hadoopConf);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Names
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String getResourceManagerEndpointName() {
+               return RESOURCE_MANAGER_RPC_ENDPOINT_NAME;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Services
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public RunningJobsRegistry getRunningJobsRegistry() throws IOException {
+               enter();
+               try {
+                       // IMPORTANT: The registry must NOT place its data in a 
directory that is
+                       // cleaned up by these services.
+                       return new 
FsNegativeRunningJobsRegistry(flinkFileSystem, workingDirectory);
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+               enter();
+               try {
+                       return new StandaloneCheckpointRecoveryFactory();
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
+               throw new UnsupportedOperationException("These 
High-Availability Services do not support storing job graphs");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
new file mode 100644
index 0000000..4c78726
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The basis of {@link HighAvailabilityServices} for YARN setups.
+ * These high-availability services auto-configure YARN's HDFS and the YARN 
application's
+ * working directory to be used to store job recovery data.
+ * 
+ * <p>Note for implementers: This class locks access to and creation of 
services,
+ * to make sure all services are properly shut down when shutting down this 
class.
+ * To participate in the checks, overriding methods should frame method body 
with
+ * calls to {@code enter()} and {@code exit()} as shown in the following 
pattern:
+ * 
+ * <pre>{@code
+ * public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ *     enter();
+ *     try {
+ *         CuratorClient client = getCuratorClient();
+ *         return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
+ *     } finally {
+ *         exit();
+ *     }
+ * }
+ * }</pre>
+ */
+public abstract class YarnHighAvailabilityServices implements 
HighAvailabilityServices {
+
+       /** The name of the sub directory in which Flink stores the recovery 
data */
+       public static final String FLINK_RECOVERY_DATA_DIR = 
"flink_recovery_data";
+
+       /** Logger for these services, shared with subclasses */
+       protected static final Logger LOG = 
LoggerFactory.getLogger(YarnHighAvailabilityServices.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** The lock that guards all accesses to methods in this class */
+       private final ReentrantLock lock;
+
+       /** The Flink FileSystem object that represent the HDFS used by YARN */
+       protected final FileSystem flinkFileSystem;
+
+       /** The Hadoop FileSystem object that represent the HDFS used by YARN */
+       protected final org.apache.hadoop.fs.FileSystem hadoopFileSystem;
+
+       /** The working directory of this YARN application.
+        * This MUST NOT be deleted when the HA services clean up */
+       protected final Path workingDirectory;
+
+       /** The directory for HA persistent data. This should be deleted when 
the
+        * HA services clean up */
+       protected final Path haDataDirectory;
+
+       /** Flag marking this instance as shut down */
+       private volatile boolean closed;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates new YARN high-availability services, configuring the file 
system and recovery
+        * data directory based on the working directory in the given Hadoop 
configuration.
+        * 
+        * <p>This class requires that the default Hadoop file system 
configured in the given
+        * Hadoop configuration is an HDFS.
+        * 
+        * @param config     The Flink configuration of this component / 
process.
+        * @param hadoopConf The Hadoop configuration for the YARN cluster.
+        * 
+        * @throws IOException Thrown, if the initialization of the Hadoop file 
system used by YARN fails.
+        */
+       protected YarnHighAvailabilityServices(
+                       Configuration config,
+                       org.apache.hadoop.conf.Configuration hadoopConf) throws 
IOException {
+
+               checkNotNull(config);
+               checkNotNull(hadoopConf);
+
+               this.lock = new ReentrantLock();
+
+               // get and verify the YARN HDFS URI
+               final URI fsUri = 
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConf);
+               if (fsUri.getScheme() == null || 
!"hdfs".equals(fsUri.getScheme().toLowerCase())) {
+                       throw new IOException("Invalid file system found for 
YarnHighAvailabilityServices: " +
+                                       "Expected 'hdfs', but found '" + 
fsUri.getScheme() + "'.");
+               }
+
+               // initialize the Hadoop File System
+               // we go through this special code path here to make sure we 
get no shared cached
+               // instance of the FileSystem
+               try {
+                       final Class<? extends org.apache.hadoop.fs.FileSystem> 
fsClass =
+                                       
org.apache.hadoop.fs.FileSystem.getFileSystemClass(fsUri.getScheme(), 
hadoopConf);
+
+                       this.hadoopFileSystem = 
InstantiationUtil.instantiate(fsClass);
+                       this.hadoopFileSystem.initialize(fsUri, hadoopConf);
+               }
+               catch (Exception e) {
+                       throw new IOException("Cannot instantiate YARN's Hadoop 
file system for " + fsUri, e);
+               }
+
+               this.flinkFileSystem = new HadoopFileSystem(hadoopConf, 
hadoopFileSystem);
+
+               this.workingDirectory = new 
Path(hadoopFileSystem.getWorkingDirectory().toUri());
+               this.haDataDirectory = new Path(workingDirectory, 
FLINK_RECOVERY_DATA_DIR);
+
+               // test the file system, to make sure we fail fast if access 
does not work
+               try {
+                       flinkFileSystem.mkdirs(haDataDirectory);
+               }
+               catch (Exception e) {
+                       throw new IOException("Could not create the directory 
for recovery data in YARN's file system at '"
+                                       + haDataDirectory + "'.", e);
+               }
+
+               LOG.info("Flink YARN application will store recovery data at 
{}", haDataDirectory);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  high availability services
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public BlobStore createBlobStore() throws IOException {
+               enter();
+               try {
+                       return new FileSystemBlobStore(flinkFileSystem, 
haDataDirectory.toString());
+               } finally {
+                       exit();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Shutdown
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Checks whether these services have been shut down.
+        *
+        * @return True, if this instance has been shut down, false if it still 
operational.
+        */
+       public boolean isClosed() {
+               return closed;
+       }
+
+       @Override
+       public void close() throws Exception {
+               lock.lock();
+               try {
+                       // close only once
+                       if (closed) {
+                               return;
+                       }
+                       closed = true;
+
+                       // we do not propagate exceptions here, but only log 
them
+                       try {
+                               hadoopFileSystem.close();
+                       } catch (Throwable t) {
+                               LOG.warn("Error closing Hadoop FileSystem", t);
+                       }
+               }
+               finally {
+                       lock.unlock();
+               }
+       }
+
+       @Override
+       public void closeAndCleanupAllData() throws Exception {
+               lock.lock();
+               try {
+                       checkState(!closed, "YarnHighAvailabilityServices are 
already closed");
+
+                       // we remember exceptions only, then continue cleanup, 
and re-throw at the end
+                       Throwable exception = null;
+
+                       // first, we delete all data in Flink's data directory
+                       try {
+                               flinkFileSystem.delete(haDataDirectory, true);
+                       }
+                       catch (Throwable t) {
+                               exception = t;
+                       }
+
+                       // now we actually close the services
+                       try {
+                               close();
+                       }
+                       catch (Throwable t) {
+                               exception = firstOrSuppressed(t, exception);
+                       }
+
+                       // if some exception occurred, rethrow
+                       if (exception != null) {
+                               ExceptionUtils.rethrowException(exception, 
exception.getMessage());
+                       }
+               }
+               finally {
+                       lock.unlock();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       /**
+        * To be called at the beginning of every method that creates an HA 
service. Acquires the lock
+        * and check whether this HighAvailabilityServices instance is shut 
down.
+        */
+       void enter() {
+               if (!enterUnlessClosed()) {
+                       throw new IllegalStateException("closed");
+               }
+       }
+
+       /**
+        * Acquires the lock and checks whether the services are already 
closed. If they are
+        * already closed, the method releases the lock and returns {@code 
false}.
+        * 
+        * @return True, if the lock was acquired and the services are not 
closed, false if the services are closed.
+        */
+       boolean enterUnlessClosed() {
+               lock.lock();
+               if (!closed) {
+                       return true;
+               } else {
+                       lock.unlock();
+                       return false;
+               }
+       }
+
+       /**
+        * To be called at the end of every method that creates an HA service. 
Releases the lock.
+        */
+       void exit() {
+               lock.unlock();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Factory from Configuration
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates the high-availability services for a single-job Flink YARN 
application, to be
+        * used in the Application Master that runs both ResourceManager and 
JobManager.
+        * 
+        * @param flinkConfig  The Flink configuration.
+        * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+        * 
+        * @return The created high-availability services.
+        * 
+        * @throws IOException Thrown, if the high-availability services could 
not be initialized.
+        */
+       public static YarnHighAvailabilityServices forSingleJobAppMaster(
+                       Configuration flinkConfig,
+                       org.apache.hadoop.conf.Configuration hadoopConfig) 
throws IOException {
+
+               checkNotNull(flinkConfig, "flinkConfig");
+               checkNotNull(hadoopConfig, "hadoopConfig");
+
+               final HighAvailabilityMode mode = 
HighAvailabilityMode.fromConfig(flinkConfig);
+               switch (mode) {
+                       case NONE:
+                               return new 
YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+
+                       case ZOOKEEPER:
+                               throw  new UnsupportedOperationException("to be 
implemented");
+
+                       default:
+                               throw new 
IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+               }
+       }
+
+       /**
+        * Creates the high-availability services for the TaskManagers 
participating in
+        * a Flink YARN application.
+        *
+        * @param flinkConfig  The Flink configuration.
+        * @param hadoopConfig The Hadoop configuration for the YARN cluster.
+        *
+        * @return The created high-availability services.
+        *
+        * @throws IOException Thrown, if the high-availability services could 
not be initialized.
+        */
+       public static YarnHighAvailabilityServices forYarnTaskManager(
+                       Configuration flinkConfig,
+                       org.apache.hadoop.conf.Configuration hadoopConfig) 
throws IOException {
+
+               checkNotNull(flinkConfig, "flinkConfig");
+               checkNotNull(hadoopConfig, "hadoopConfig");
+
+               final HighAvailabilityMode mode = 
HighAvailabilityMode.fromConfig(flinkConfig);
+               switch (mode) {
+                       case NONE:
+                               return new 
YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+                       case ZOOKEEPER:
+                               throw  new UnsupportedOperationException("to be 
implemented");
+
+                       default:
+                               throw new 
IllegalConfigurationException("Unrecognized high availability mode: " + mode);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
new file mode 100644
index 0000000..fd1a45e
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.ServicesThreadFactory;
+import 
org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * These YarnHighAvailabilityServices are for the Application Master in setups 
where there is one
+ * ResourceManager that is statically configured in the Flink configuration.
+ * 
+ * <h3>Handled failure types</h3>
+ * <ul>
+ *     <li><b>User code & operator failures:</b> Failed operators are 
recovered from checkpoints.</li>
+ *     <li><b>Task Manager Failures:</b> Failed Task Managers are restarted 
and their tasks are
+ *         recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ *     <li><b>Application Master failures:</b> These failures cannot be 
recovered, because TaskManagers
+ *     have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working 
directory,
+ * except for checkpoints, which are in the configured checkpoint directory. 
That way,
+ * checkpoints can be resumed with a new job/application, even if the complete 
YARN application
+ * is killed and cleaned up. 
+ *
+ * <p>Because ResourceManager and JobManager run both in the same process 
(Application Master), they
+ * use an embedded leader election service to find each other.
+ * 
+ * <p>A typical YARN setup that uses these HA services first starts the 
ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the 
configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the 
setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the 
Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices {
+
+       /** The dispatcher thread pool for these services */
+       private final ExecutorService dispatcher;
+
+       /** The embedded leader election service used by JobManagers to find 
the resource manager */
+       private final SingleLeaderElectionService 
resourceManagerLeaderElectionService;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates new YarnIntraNonHaMasterServices for the given Flink and 
YARN configuration.
+        * 
+        * This constructor initializes access to the HDFS to store recovery 
data, and creates the
+        * embedded leader election services through which ResourceManager and 
JobManager find and
+        * confirm each other.
+        * 
+        * @param config     The Flink configuration of this component / 
process.
+        * @param hadoopConf The Hadoop configuration for the YARN cluster.
+        *
+        * @throws IOException
+        *             Thrown, if the initialization of the Hadoop file system 
used by YARN fails.
+        * @throws IllegalConfigurationException
+        *             Thrown, if the Flink configuration does not properly 
describe the ResourceManager address and port.
+        */
+       public YarnIntraNonHaMasterServices(
+                       Configuration config,
+                       org.apache.hadoop.conf.Configuration hadoopConf) throws 
IOException {
+
+               super(config, hadoopConf);
+
+               // track whether we successfully perform the initialization
+               boolean successful = false;
+
+               try {
+                       this.dispatcher = Executors.newSingleThreadExecutor(new 
ServicesThreadFactory());
+                       this.resourceManagerLeaderElectionService = new 
SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID);
+
+                       // all good!
+                       successful = true;
+               }
+               finally {
+                       if (!successful) {
+                               // quietly undo what the parent constructor 
initialized
+                               try {
+                                       super.close();
+                               } catch (Throwable ignored) {}
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Services
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+               enter();
+               try {
+                       return 
resourceManagerLeaderElectionService.createLeaderRetrievalService();
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
+               enter();
+               try {
+                       return resourceManagerLeaderElectionService;
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+               enter();
+               try {
+                       throw new UnsupportedOperationException("needs 
refactoring to accept default address");
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
+               enter();
+               try {
+                       throw new UnsupportedOperationException("needs 
refactoring to accept default address");
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  shutdown
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void close() throws Exception {
+               if (enterUnlessClosed()) {
+                       try {
+                               try {
+                                       // this class' own cleanup logic
+                                       
resourceManagerLeaderElectionService.shutdown();
+                                       dispatcher.shutdownNow();
+                               }
+                               finally {
+                                       // in any case must we call the parent 
cleanup logic
+                                       super.close();
+                               }
+                       }
+                       finally {
+                               exit();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
new file mode 100644
index 0000000..eb4b77e
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcServiceUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import java.io.IOException;
+
+/**
+ * These YarnHighAvailabilityServices are for use by the TaskManager in setups,
+ * where there is one ResourceManager that is statically configured in the 
Flink configuration.
+ * 
+ * <h3>Handled failure types</h3>
+ * <ul>
+ *     <li><b>User code & operator failures:</b> Failed operators are 
recovered from checkpoints.</li>
+ *     <li><b>Task Manager Failures:</b> Failed Task Managers are restarted 
and their tasks are
+ *         recovered from checkpoints.</li>
+ * </ul>
+ *
+ * <h3>Non-recoverable failure types</h3>
+ * <ul>
+ *     <li><b>Application Master failures:</b> These failures cannot be 
recovered, because TaskManagers
+ *     have no way to discover the new Application Master's address.</li>
+ * </ul>
+ *
+ * <p>Internally, these services put their recovery data into YARN's working 
directory,
+ * except for checkpoints, which are in the configured checkpoint directory. 
That way,
+ * checkpoints can be resumed with a new job/application, even if the complete 
YARN application
+ * is killed and cleaned up. 
+ *
+ * <p>A typical YARN setup that uses these HA services first starts the 
ResourceManager
+ * inside the ApplicationMaster and puts its RPC endpoint address into the 
configuration with which
+ * the TaskManagers are started. Because of this static addressing scheme, the 
setup cannot handle failures
+ * of the JobManager and ResourceManager, which are running as part of the 
Application Master.
+ *
+ * @see HighAvailabilityServices
+ */
+public class YarnPreConfiguredMasterNonHaServices extends 
AbstractYarnNonHaServices {
+
+       /** The RPC URL under which the single ResourceManager can be reached 
while available */ 
+       private final String resourceManagerRpcUrl;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates new YarnPreConfiguredMasterHaServices for the given Flink 
and YARN configuration.
+        * This constructor parses the ResourceManager address from the Flink 
configuration and sets
+        * up the HDFS access to store recovery data in the YARN application's 
working directory.
+        * 
+        * @param config     The Flink configuration of this component / 
process.
+        * @param hadoopConf The Hadoop configuration for the YARN cluster.
+        *
+        * @throws IOException
+        *             Thrown, if the initialization of the Hadoop file system 
used by YARN fails.
+        * @throws IllegalConfigurationException
+        *             Thrown, if the Flink configuration does not properly 
describe the ResourceManager address and port.
+        */
+       public YarnPreConfiguredMasterNonHaServices(
+                       Configuration config,
+                       org.apache.hadoop.conf.Configuration hadoopConf) throws 
IOException {
+
+               super(config, hadoopConf);
+
+               // track whether we successfully perform the initialization
+               boolean successful = false;
+
+               try {
+                       // extract the hostname and port of the resource manager
+                       final String rmHost = 
config.getString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS);
+                       final int rmPort = 
config.getInteger(YarnConfigOptions.APP_MASTER_RPC_PORT);
+
+                       if (rmHost == null) {
+                               throw new IllegalConfigurationException("Config 
parameter '" + 
+                                               
YarnConfigOptions.APP_MASTER_RPC_ADDRESS.key() + "' is missing.");
+                       }
+                       if (rmPort < 0) {
+                               throw new IllegalConfigurationException("Config 
parameter '" +
+                                               
YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' is missing.");
+                       }
+                       if (rmPort <= 0 || rmPort >= 65536) {
+                               throw new 
IllegalConfigurationException("Invalid value for '" + 
+                                               
YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' - port must be in [1, 65535]");
+                       }
+
+                       this.resourceManagerRpcUrl = RpcServiceUtils.getRpcUrl(
+                                       rmHost, rmPort, 
RESOURCE_MANAGER_RPC_ENDPOINT_NAME, config);
+
+                       // all well!
+                       successful = true;
+               }
+               finally {
+                       if (!successful) {
+                               // quietly undo what the parent constructor 
initialized
+                               try {
+                                       super.close();
+                               } catch (Throwable ignored) {}
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Services
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+               enter();
+               try {
+                       return new 
StandaloneLeaderRetrievalService(resourceManagerRpcUrl, DEFAULT_LEADER_ID);
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
+               enter();
+               try {
+                       throw new UnsupportedOperationException("Not supported 
on the TaskManager side");
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+               enter();
+               try {
+                       throw new UnsupportedOperationException("needs 
refactoring to accept default address");
+               }
+               finally {
+                       exit();
+               }
+       }
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
+               enter();
+               try {
+                       throw new UnsupportedOperationException("needs 
refactoring to accept default address");
+               }
+               finally {
+                       exit();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
new file mode 100644
index 0000000..0e7bf0f
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class YarnIntraNonHaMasterServicesTest {
+
+       private static final Random RND = new Random();
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+       private static MiniDFSCluster HDFS_CLUSTER;
+
+       private static Path HDFS_ROOT_PATH;
+
+       private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+       // 
------------------------------------------------------------------------
+       //  Test setup and shutdown
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void createHDFS() throws Exception {
+               final File tempDir = TEMP_DIR.newFolder();
+
+               org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+               hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tempDir.getAbsolutePath());
+
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+               HDFS_CLUSTER = builder.build();
+               HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               if (HDFS_CLUSTER != null) {
+                       HDFS_CLUSTER.shutdown();
+               }
+               HDFS_CLUSTER = null;
+               HDFS_ROOT_PATH = null;
+       }
+
+       @Before
+       public void initConfig() {
+               hadoopConfig = new org.apache.hadoop.conf.Configuration();
+               
hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, 
HDFS_ROOT_PATH.toString());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testRepeatedClose() throws Exception {
+               final Configuration flinkConfig = new Configuration();
+
+               final YarnHighAvailabilityServices services = new 
YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig);
+               services.closeAndCleanupAllData();
+
+               // this should not throw an exception
+               services.close();
+       }
+
+       @Test
+       public void testClosingReportsToLeader() throws Exception {
+               final Configuration flinkConfig = new Configuration();
+
+               try (YarnHighAvailabilityServices services = new 
YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
+                       final LeaderElectionService elector = 
services.getResourceManagerLeaderElectionService();
+                       final LeaderContender contender = 
mockContender(elector);
+
+                       elector.start(contender);
+                       services.close();
+
+                       verify(contender, 
timeout(100).times(1)).handleError(any(Exception.class));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       private static LeaderContender mockContender(final 
LeaderElectionService service) {
+               String address = StringUtils.getRandomString(RND, 5, 10, 'a', 
'z');
+               return mockContender(service, address);
+       }
+
+       private static LeaderContender mockContender(final 
LeaderElectionService service, final String address) {
+               LeaderContender mockContender = mock(LeaderContender.class);
+
+               when(mockContender.getAddress()).thenReturn(address);
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                               final UUID uuid = (UUID) 
invocation.getArguments()[0];
+                               service.confirmLeaderSessionID(uuid);
+                               return null;
+                       }
+               }).when(mockContender).grantLeadership(any(UUID.class));
+
+               return mockContender;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
new file mode 100644
index 0000000..a13deac
--- /dev/null
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.*;
+
+
+public class YarnPreConfiguredMasterHaServicesTest extends TestLogger {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_DIR = new TemporaryFolder();
+
+       private static MiniDFSCluster HDFS_CLUSTER;
+
+       private static Path HDFS_ROOT_PATH;
+
+       private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+       // 
------------------------------------------------------------------------
+       //  Test setup and shutdown
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void createHDFS() throws Exception {
+               final File tempDir = TEMP_DIR.newFolder();
+
+               org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+               hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tempDir.getAbsolutePath());
+
+               MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+               HDFS_CLUSTER = builder.build();
+               HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI());
+       }
+
+       @AfterClass
+       public static void destroyHDFS() {
+               if (HDFS_CLUSTER != null) {
+                       HDFS_CLUSTER.shutdown();
+               }
+               HDFS_CLUSTER = null;
+               HDFS_ROOT_PATH = null;
+       }
+
+       @Before
+       public void initConfig() {
+               hadoopConfig = new org.apache.hadoop.conf.Configuration();
+               
hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, 
HDFS_ROOT_PATH.toString());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testConstantResourceManagerName() throws Exception {
+               final Configuration flinkConfig = new Configuration();
+               flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, 
"localhost");
+               flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 
1427);
+
+               YarnHighAvailabilityServices services1 = new 
YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+               YarnHighAvailabilityServices services2 = new 
YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+               try {
+                       String rmName1 = 
services1.getResourceManagerEndpointName();
+                       String rmName2 = 
services2.getResourceManagerEndpointName();
+
+                       assertNotNull(rmName1);
+                       assertNotNull(rmName2);
+                       assertEquals(rmName1, rmName2);
+               }
+               finally {
+                       services1.closeAndCleanupAllData();
+                       services2.closeAndCleanupAllData();
+               }
+       }
+
+       @Test
+       public void testMissingRmConfiguration() throws Exception {
+               final Configuration flinkConfig = new Configuration();
+
+               // missing resource manager address
+               try {
+                       new YarnPreConfiguredMasterNonHaServices(flinkConfig, 
hadoopConfig);
+                       fail();
+               } catch (IllegalConfigurationException e) {
+                       // expected
+               }
+
+               flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, 
"localhost");
+
+               // missing resource manager port
+               try {
+                       new YarnPreConfiguredMasterNonHaServices(flinkConfig, 
hadoopConfig);
+                       fail();
+               } catch (IllegalConfigurationException e) {
+                       // expected
+               }
+
+               flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 
1427);
+
+               // now everything is good ;-)
+               new YarnPreConfiguredMasterNonHaServices(flinkConfig, 
hadoopConfig).closeAndCleanupAllData();
+       }
+
+       @Test
+       public void testCloseAndCleanup() throws Exception {
+               final Configuration flinkConfig = new Configuration();
+               flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, 
"localhost");
+               flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 
1427);
+
+               // create the services
+               YarnHighAvailabilityServices services = new 
YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+               services.closeAndCleanupAllData();
+
+               final FileSystem fileSystem = HDFS_ROOT_PATH.getFileSystem();
+               final Path workDir = new 
Path(HDFS_CLUSTER.getFileSystem().getWorkingDirectory().toString());
+               
+               try {
+                       fileSystem.getFileStatus(new Path(workDir, 
YarnHighAvailabilityServices.FLINK_RECOVERY_DATA_DIR));
+                       fail("Flink recovery data directory still exists");
+               }
+               catch (FileNotFoundException e) {
+                       // expected, because the directory should have been 
cleaned up
+               }
+
+               assertTrue(services.isClosed());
+
+               // doing another cleanup when the services are closed should 
fail
+               try {
+                       services.closeAndCleanupAllData();
+                       fail("should fail with an IllegalStateException");
+               } catch (IllegalStateException e) {
+                       // expected
+               }
+       }
+
+       @Test
+       public void testCallsOnClosedServices() throws Exception {
+               final Configuration flinkConfig = new Configuration();
+               flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, 
"localhost");
+               flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 
1427);
+
+               YarnHighAvailabilityServices services = new 
YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig);
+
+               // this method is not supported
+               try {
+                       services.getSubmittedJobGraphStore();
+                       fail();
+               } catch (UnsupportedOperationException ignored) {}
+
+
+               services.close();
+
+               // all these methods should fail now
+
+               try {
+                       services.createBlobStore();
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               try {
+                       services.getCheckpointRecoveryFactory();
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               try {
+                       services.getJobManagerLeaderElectionService(new 
JobID());
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               try {
+                       services.getJobManagerLeaderRetriever(new JobID());
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               try {
+                       services.getRunningJobsRegistry();
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               try {
+                       services.getResourceManagerLeaderElectionService();
+                       fail();
+               } catch (IllegalStateException ignored) {}
+
+               try {
+                       services.getResourceManagerLeaderRetriever();
+                       fail();
+               } catch (IllegalStateException ignored) {}
+       }
+}

Reply via email to