http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java new file mode 100644 index 0000000..7aa481f --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/AbstractYarnNonHaServices.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.highavailability; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry; +import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; + +import java.io.IOException; + +/** + * Abstract base class for the high availability services for Flink YARN applications that support + * no master fail over. + * + * <p>Internally, these services put their recovery data into YARN's working directory, + * except for checkpoints, which are in the configured checkpoint directory. That way, + * checkpoints can be resumed with a new job/application, even if the complete YARN application + * is killed and cleaned up. + */ +public abstract class AbstractYarnNonHaServices extends YarnHighAvailabilityServices { + + /** The constant name of the ResourceManager RPC endpoint */ + protected static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager"; + + // ------------------------------------------------------------------------ + + /** + * Creates new YARN high-availability services, configuring the file system and recovery + * data directory based on the working directory in the given Hadoop configuration. + * + * <p>This class requires that the default Hadoop file system configured in the given + * Hadoop configuration is an HDFS. + * + * @param config The Flink configuration of this component / process. + * @param hadoopConf The Hadoop configuration for the YARN cluster. + * + * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails. + */ + protected AbstractYarnNonHaServices( + Configuration config, + org.apache.hadoop.conf.Configuration hadoopConf) throws IOException { + super(config, hadoopConf); + } + + // ------------------------------------------------------------------------ + // Names + // ------------------------------------------------------------------------ + + @Override + public String getResourceManagerEndpointName() { + return RESOURCE_MANAGER_RPC_ENDPOINT_NAME; + } + + // ------------------------------------------------------------------------ + // Services + // ------------------------------------------------------------------------ + + @Override + public RunningJobsRegistry getRunningJobsRegistry() throws IOException { + enter(); + try { + // IMPORTANT: The registry must NOT place its data in a directory that is + // cleaned up by these services. + return new FsNegativeRunningJobsRegistry(flinkFileSystem, workingDirectory); + } + finally { + exit(); + } + } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { + enter(); + try { + return new StandaloneCheckpointRecoveryFactory(); + } + finally { + exit(); + } + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + throw new UnsupportedOperationException("These High-Availability Services do not support storing job graphs"); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java new file mode 100644 index 0000000..4c78726 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.highavailability; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.FileSystemBlobStore; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.InstantiationUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The basis of {@link HighAvailabilityServices} for YARN setups. + * These high-availability services auto-configure YARN's HDFS and the YARN application's + * working directory to be used to store job recovery data. + * + * <p>Note for implementers: This class locks access to and creation of services, + * to make sure all services are properly shut down when shutting down this class. + * To participate in the checks, overriding methods should frame method body with + * calls to {@code enter()} and {@code exit()} as shown in the following pattern: + * + * <pre>{@code + * public LeaderRetrievalService getResourceManagerLeaderRetriever() { + * enter(); + * try { + * CuratorClient client = getCuratorClient(); + * return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); + * } finally { + * exit(); + * } + * } + * }</pre> + */ +public abstract class YarnHighAvailabilityServices implements HighAvailabilityServices { + + /** The name of the sub directory in which Flink stores the recovery data */ + public static final String FLINK_RECOVERY_DATA_DIR = "flink_recovery_data"; + + /** Logger for these services, shared with subclasses */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnHighAvailabilityServices.class); + + // ------------------------------------------------------------------------ + + /** The lock that guards all accesses to methods in this class */ + private final ReentrantLock lock; + + /** The Flink FileSystem object that represent the HDFS used by YARN */ + protected final FileSystem flinkFileSystem; + + /** The Hadoop FileSystem object that represent the HDFS used by YARN */ + protected final org.apache.hadoop.fs.FileSystem hadoopFileSystem; + + /** The working directory of this YARN application. + * This MUST NOT be deleted when the HA services clean up */ + protected final Path workingDirectory; + + /** The directory for HA persistent data. This should be deleted when the + * HA services clean up */ + protected final Path haDataDirectory; + + /** Flag marking this instance as shut down */ + private volatile boolean closed; + + // ------------------------------------------------------------------------ + + /** + * Creates new YARN high-availability services, configuring the file system and recovery + * data directory based on the working directory in the given Hadoop configuration. + * + * <p>This class requires that the default Hadoop file system configured in the given + * Hadoop configuration is an HDFS. + * + * @param config The Flink configuration of this component / process. + * @param hadoopConf The Hadoop configuration for the YARN cluster. + * + * @throws IOException Thrown, if the initialization of the Hadoop file system used by YARN fails. + */ + protected YarnHighAvailabilityServices( + Configuration config, + org.apache.hadoop.conf.Configuration hadoopConf) throws IOException { + + checkNotNull(config); + checkNotNull(hadoopConf); + + this.lock = new ReentrantLock(); + + // get and verify the YARN HDFS URI + final URI fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConf); + if (fsUri.getScheme() == null || !"hdfs".equals(fsUri.getScheme().toLowerCase())) { + throw new IOException("Invalid file system found for YarnHighAvailabilityServices: " + + "Expected 'hdfs', but found '" + fsUri.getScheme() + "'."); + } + + // initialize the Hadoop File System + // we go through this special code path here to make sure we get no shared cached + // instance of the FileSystem + try { + final Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = + org.apache.hadoop.fs.FileSystem.getFileSystemClass(fsUri.getScheme(), hadoopConf); + + this.hadoopFileSystem = InstantiationUtil.instantiate(fsClass); + this.hadoopFileSystem.initialize(fsUri, hadoopConf); + } + catch (Exception e) { + throw new IOException("Cannot instantiate YARN's Hadoop file system for " + fsUri, e); + } + + this.flinkFileSystem = new HadoopFileSystem(hadoopConf, hadoopFileSystem); + + this.workingDirectory = new Path(hadoopFileSystem.getWorkingDirectory().toUri()); + this.haDataDirectory = new Path(workingDirectory, FLINK_RECOVERY_DATA_DIR); + + // test the file system, to make sure we fail fast if access does not work + try { + flinkFileSystem.mkdirs(haDataDirectory); + } + catch (Exception e) { + throw new IOException("Could not create the directory for recovery data in YARN's file system at '" + + haDataDirectory + "'.", e); + } + + LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory); + } + + // ------------------------------------------------------------------------ + // high availability services + // ------------------------------------------------------------------------ + + @Override + public BlobStore createBlobStore() throws IOException { + enter(); + try { + return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString()); + } finally { + exit(); + } + } + + // ------------------------------------------------------------------------ + // Shutdown + // ------------------------------------------------------------------------ + + /** + * Checks whether these services have been shut down. + * + * @return True, if this instance has been shut down, false if it still operational. + */ + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws Exception { + lock.lock(); + try { + // close only once + if (closed) { + return; + } + closed = true; + + // we do not propagate exceptions here, but only log them + try { + hadoopFileSystem.close(); + } catch (Throwable t) { + LOG.warn("Error closing Hadoop FileSystem", t); + } + } + finally { + lock.unlock(); + } + } + + @Override + public void closeAndCleanupAllData() throws Exception { + lock.lock(); + try { + checkState(!closed, "YarnHighAvailabilityServices are already closed"); + + // we remember exceptions only, then continue cleanup, and re-throw at the end + Throwable exception = null; + + // first, we delete all data in Flink's data directory + try { + flinkFileSystem.delete(haDataDirectory, true); + } + catch (Throwable t) { + exception = t; + } + + // now we actually close the services + try { + close(); + } + catch (Throwable t) { + exception = firstOrSuppressed(t, exception); + } + + // if some exception occurred, rethrow + if (exception != null) { + ExceptionUtils.rethrowException(exception, exception.getMessage()); + } + } + finally { + lock.unlock(); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * To be called at the beginning of every method that creates an HA service. Acquires the lock + * and check whether this HighAvailabilityServices instance is shut down. + */ + void enter() { + if (!enterUnlessClosed()) { + throw new IllegalStateException("closed"); + } + } + + /** + * Acquires the lock and checks whether the services are already closed. If they are + * already closed, the method releases the lock and returns {@code false}. + * + * @return True, if the lock was acquired and the services are not closed, false if the services are closed. + */ + boolean enterUnlessClosed() { + lock.lock(); + if (!closed) { + return true; + } else { + lock.unlock(); + return false; + } + } + + /** + * To be called at the end of every method that creates an HA service. Releases the lock. + */ + void exit() { + lock.unlock(); + } + + // ------------------------------------------------------------------------ + // Factory from Configuration + // ------------------------------------------------------------------------ + + /** + * Creates the high-availability services for a single-job Flink YARN application, to be + * used in the Application Master that runs both ResourceManager and JobManager. + * + * @param flinkConfig The Flink configuration. + * @param hadoopConfig The Hadoop configuration for the YARN cluster. + * + * @return The created high-availability services. + * + * @throws IOException Thrown, if the high-availability services could not be initialized. + */ + public static YarnHighAvailabilityServices forSingleJobAppMaster( + Configuration flinkConfig, + org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException { + + checkNotNull(flinkConfig, "flinkConfig"); + checkNotNull(hadoopConfig, "hadoopConfig"); + + final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig); + switch (mode) { + case NONE: + return new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig); + + case ZOOKEEPER: + throw new UnsupportedOperationException("to be implemented"); + + default: + throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode); + } + } + + /** + * Creates the high-availability services for the TaskManagers participating in + * a Flink YARN application. + * + * @param flinkConfig The Flink configuration. + * @param hadoopConfig The Hadoop configuration for the YARN cluster. + * + * @return The created high-availability services. + * + * @throws IOException Thrown, if the high-availability services could not be initialized. + */ + public static YarnHighAvailabilityServices forYarnTaskManager( + Configuration flinkConfig, + org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException { + + checkNotNull(flinkConfig, "flinkConfig"); + checkNotNull(hadoopConfig, "hadoopConfig"); + + final HighAvailabilityMode mode = HighAvailabilityMode.fromConfig(flinkConfig); + switch (mode) { + case NONE: + return new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig); + + case ZOOKEEPER: + throw new UnsupportedOperationException("to be implemented"); + + default: + throw new IllegalConfigurationException("Unrecognized high availability mode: " + mode); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java new file mode 100644 index 0000000..fd1a45e --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServices.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.ServicesThreadFactory; +import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * These YarnHighAvailabilityServices are for the Application Master in setups where there is one + * ResourceManager that is statically configured in the Flink configuration. + * + * <h3>Handled failure types</h3> + * <ul> + * <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li> + * <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are + * recovered from checkpoints.</li> + * </ul> + * + * <h3>Non-recoverable failure types</h3> + * <ul> + * <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers + * have no way to discover the new Application Master's address.</li> + * </ul> + * + * <p>Internally, these services put their recovery data into YARN's working directory, + * except for checkpoints, which are in the configured checkpoint directory. That way, + * checkpoints can be resumed with a new job/application, even if the complete YARN application + * is killed and cleaned up. + * + * <p>Because ResourceManager and JobManager run both in the same process (Application Master), they + * use an embedded leader election service to find each other. + * + * <p>A typical YARN setup that uses these HA services first starts the ResourceManager + * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which + * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures + * of the JobManager and ResourceManager, which are running as part of the Application Master. + * + * @see HighAvailabilityServices + */ +public class YarnIntraNonHaMasterServices extends AbstractYarnNonHaServices { + + /** The dispatcher thread pool for these services */ + private final ExecutorService dispatcher; + + /** The embedded leader election service used by JobManagers to find the resource manager */ + private final SingleLeaderElectionService resourceManagerLeaderElectionService; + + // ------------------------------------------------------------------------ + + /** + * Creates new YarnIntraNonHaMasterServices for the given Flink and YARN configuration. + * + * This constructor initializes access to the HDFS to store recovery data, and creates the + * embedded leader election services through which ResourceManager and JobManager find and + * confirm each other. + * + * @param config The Flink configuration of this component / process. + * @param hadoopConf The Hadoop configuration for the YARN cluster. + * + * @throws IOException + * Thrown, if the initialization of the Hadoop file system used by YARN fails. + * @throws IllegalConfigurationException + * Thrown, if the Flink configuration does not properly describe the ResourceManager address and port. + */ + public YarnIntraNonHaMasterServices( + Configuration config, + org.apache.hadoop.conf.Configuration hadoopConf) throws IOException { + + super(config, hadoopConf); + + // track whether we successfully perform the initialization + boolean successful = false; + + try { + this.dispatcher = Executors.newSingleThreadExecutor(new ServicesThreadFactory()); + this.resourceManagerLeaderElectionService = new SingleLeaderElectionService(dispatcher, DEFAULT_LEADER_ID); + + // all good! + successful = true; + } + finally { + if (!successful) { + // quietly undo what the parent constructor initialized + try { + super.close(); + } catch (Throwable ignored) {} + } + } + } + + // ------------------------------------------------------------------------ + // Services + // ------------------------------------------------------------------------ + + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() { + enter(); + try { + return resourceManagerLeaderElectionService.createLeaderRetrievalService(); + } + finally { + exit(); + } + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() { + enter(); + try { + return resourceManagerLeaderElectionService; + } + finally { + exit(); + } + } + + @Override + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { + enter(); + try { + throw new UnsupportedOperationException("needs refactoring to accept default address"); + } + finally { + exit(); + } + } + + @Override + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { + enter(); + try { + throw new UnsupportedOperationException("needs refactoring to accept default address"); + } + finally { + exit(); + } + } + + // ------------------------------------------------------------------------ + // shutdown + // ------------------------------------------------------------------------ + + @Override + public void close() throws Exception { + if (enterUnlessClosed()) { + try { + try { + // this class' own cleanup logic + resourceManagerLeaderElectionService.shutdown(); + dispatcher.shutdownNow(); + } + finally { + // in any case must we call the parent cleanup logic + super.close(); + } + } + finally { + exit(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java new file mode 100644 index 0000000..eb4b77e --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterNonHaServices.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; +import org.apache.flink.runtime.rpc.RpcServiceUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import java.io.IOException; + +/** + * These YarnHighAvailabilityServices are for use by the TaskManager in setups, + * where there is one ResourceManager that is statically configured in the Flink configuration. + * + * <h3>Handled failure types</h3> + * <ul> + * <li><b>User code & operator failures:</b> Failed operators are recovered from checkpoints.</li> + * <li><b>Task Manager Failures:</b> Failed Task Managers are restarted and their tasks are + * recovered from checkpoints.</li> + * </ul> + * + * <h3>Non-recoverable failure types</h3> + * <ul> + * <li><b>Application Master failures:</b> These failures cannot be recovered, because TaskManagers + * have no way to discover the new Application Master's address.</li> + * </ul> + * + * <p>Internally, these services put their recovery data into YARN's working directory, + * except for checkpoints, which are in the configured checkpoint directory. That way, + * checkpoints can be resumed with a new job/application, even if the complete YARN application + * is killed and cleaned up. + * + * <p>A typical YARN setup that uses these HA services first starts the ResourceManager + * inside the ApplicationMaster and puts its RPC endpoint address into the configuration with which + * the TaskManagers are started. Because of this static addressing scheme, the setup cannot handle failures + * of the JobManager and ResourceManager, which are running as part of the Application Master. + * + * @see HighAvailabilityServices + */ +public class YarnPreConfiguredMasterNonHaServices extends AbstractYarnNonHaServices { + + /** The RPC URL under which the single ResourceManager can be reached while available */ + private final String resourceManagerRpcUrl; + + // ------------------------------------------------------------------------ + + /** + * Creates new YarnPreConfiguredMasterHaServices for the given Flink and YARN configuration. + * This constructor parses the ResourceManager address from the Flink configuration and sets + * up the HDFS access to store recovery data in the YARN application's working directory. + * + * @param config The Flink configuration of this component / process. + * @param hadoopConf The Hadoop configuration for the YARN cluster. + * + * @throws IOException + * Thrown, if the initialization of the Hadoop file system used by YARN fails. + * @throws IllegalConfigurationException + * Thrown, if the Flink configuration does not properly describe the ResourceManager address and port. + */ + public YarnPreConfiguredMasterNonHaServices( + Configuration config, + org.apache.hadoop.conf.Configuration hadoopConf) throws IOException { + + super(config, hadoopConf); + + // track whether we successfully perform the initialization + boolean successful = false; + + try { + // extract the hostname and port of the resource manager + final String rmHost = config.getString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS); + final int rmPort = config.getInteger(YarnConfigOptions.APP_MASTER_RPC_PORT); + + if (rmHost == null) { + throw new IllegalConfigurationException("Config parameter '" + + YarnConfigOptions.APP_MASTER_RPC_ADDRESS.key() + "' is missing."); + } + if (rmPort < 0) { + throw new IllegalConfigurationException("Config parameter '" + + YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' is missing."); + } + if (rmPort <= 0 || rmPort >= 65536) { + throw new IllegalConfigurationException("Invalid value for '" + + YarnConfigOptions.APP_MASTER_RPC_PORT.key() + "' - port must be in [1, 65535]"); + } + + this.resourceManagerRpcUrl = RpcServiceUtils.getRpcUrl( + rmHost, rmPort, RESOURCE_MANAGER_RPC_ENDPOINT_NAME, config); + + // all well! + successful = true; + } + finally { + if (!successful) { + // quietly undo what the parent constructor initialized + try { + super.close(); + } catch (Throwable ignored) {} + } + } + } + + // ------------------------------------------------------------------------ + // Services + // ------------------------------------------------------------------------ + + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() { + enter(); + try { + return new StandaloneLeaderRetrievalService(resourceManagerRpcUrl, DEFAULT_LEADER_ID); + } + finally { + exit(); + } + } + + @Override + public LeaderElectionService getResourceManagerLeaderElectionService() { + enter(); + try { + throw new UnsupportedOperationException("Not supported on the TaskManager side"); + } + finally { + exit(); + } + } + + @Override + public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { + enter(); + try { + throw new UnsupportedOperationException("needs refactoring to accept default address"); + } + finally { + exit(); + } + } + + @Override + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { + enter(); + try { + throw new UnsupportedOperationException("needs refactoring to accept default address"); + } + finally { + exit(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java new file mode 100644 index 0000000..0e7bf0f --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.highavailability; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.hdfs.MiniDFSCluster; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.util.Random; +import java.util.UUID; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class YarnIntraNonHaMasterServicesTest { + + private static final Random RND = new Random(); + + @ClassRule + public static final TemporaryFolder TEMP_DIR = new TemporaryFolder(); + + private static MiniDFSCluster HDFS_CLUSTER; + + private static Path HDFS_ROOT_PATH; + + private org.apache.hadoop.conf.Configuration hadoopConfig; + + // ------------------------------------------------------------------------ + // Test setup and shutdown + // ------------------------------------------------------------------------ + + @BeforeClass + public static void createHDFS() throws Exception { + final File tempDir = TEMP_DIR.newFolder(); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + HDFS_CLUSTER = builder.build(); + HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI()); + } + + @AfterClass + public static void destroyHDFS() { + if (HDFS_CLUSTER != null) { + HDFS_CLUSTER.shutdown(); + } + HDFS_CLUSTER = null; + HDFS_ROOT_PATH = null; + } + + @Before + public void initConfig() { + hadoopConfig = new org.apache.hadoop.conf.Configuration(); + hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString()); + } + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + @Test + public void testRepeatedClose() throws Exception { + final Configuration flinkConfig = new Configuration(); + + final YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig); + services.closeAndCleanupAllData(); + + // this should not throw an exception + services.close(); + } + + @Test + public void testClosingReportsToLeader() throws Exception { + final Configuration flinkConfig = new Configuration(); + + try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) { + final LeaderElectionService elector = services.getResourceManagerLeaderElectionService(); + final LeaderContender contender = mockContender(elector); + + elector.start(contender); + services.close(); + + verify(contender, timeout(100).times(1)).handleError(any(Exception.class)); + } + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private static LeaderContender mockContender(final LeaderElectionService service) { + String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z'); + return mockContender(service, address); + } + + private static LeaderContender mockContender(final LeaderElectionService service, final String address) { + LeaderContender mockContender = mock(LeaderContender.class); + + when(mockContender.getAddress()).thenReturn(address); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final UUID uuid = (UUID) invocation.getArguments()[0]; + service.confirmLeaderSessionID(uuid); + return null; + } + }).when(mockContender).grantLeadership(any(UUID.class)); + + return mockContender; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java new file mode 100644 index 0000000..a13deac --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnPreConfiguredMasterHaServicesTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.util.TestLogger; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.hadoop.hdfs.MiniDFSCluster; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileNotFoundException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import static org.mockito.Mockito.*; + + +public class YarnPreConfiguredMasterHaServicesTest extends TestLogger { + + @ClassRule + public static final TemporaryFolder TEMP_DIR = new TemporaryFolder(); + + private static MiniDFSCluster HDFS_CLUSTER; + + private static Path HDFS_ROOT_PATH; + + private org.apache.hadoop.conf.Configuration hadoopConfig; + + // ------------------------------------------------------------------------ + // Test setup and shutdown + // ------------------------------------------------------------------------ + + @BeforeClass + public static void createHDFS() throws Exception { + final File tempDir = TEMP_DIR.newFolder(); + + org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + HDFS_CLUSTER = builder.build(); + HDFS_ROOT_PATH = new Path(HDFS_CLUSTER.getURI()); + } + + @AfterClass + public static void destroyHDFS() { + if (HDFS_CLUSTER != null) { + HDFS_CLUSTER.shutdown(); + } + HDFS_CLUSTER = null; + HDFS_ROOT_PATH = null; + } + + @Before + public void initConfig() { + hadoopConfig = new org.apache.hadoop.conf.Configuration(); + hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, HDFS_ROOT_PATH.toString()); + } + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + @Test + public void testConstantResourceManagerName() throws Exception { + final Configuration flinkConfig = new Configuration(); + flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost"); + flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427); + + YarnHighAvailabilityServices services1 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig); + YarnHighAvailabilityServices services2 = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig); + + try { + String rmName1 = services1.getResourceManagerEndpointName(); + String rmName2 = services2.getResourceManagerEndpointName(); + + assertNotNull(rmName1); + assertNotNull(rmName2); + assertEquals(rmName1, rmName2); + } + finally { + services1.closeAndCleanupAllData(); + services2.closeAndCleanupAllData(); + } + } + + @Test + public void testMissingRmConfiguration() throws Exception { + final Configuration flinkConfig = new Configuration(); + + // missing resource manager address + try { + new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig); + fail(); + } catch (IllegalConfigurationException e) { + // expected + } + + flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost"); + + // missing resource manager port + try { + new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig); + fail(); + } catch (IllegalConfigurationException e) { + // expected + } + + flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427); + + // now everything is good ;-) + new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig).closeAndCleanupAllData(); + } + + @Test + public void testCloseAndCleanup() throws Exception { + final Configuration flinkConfig = new Configuration(); + flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost"); + flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427); + + // create the services + YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig); + services.closeAndCleanupAllData(); + + final FileSystem fileSystem = HDFS_ROOT_PATH.getFileSystem(); + final Path workDir = new Path(HDFS_CLUSTER.getFileSystem().getWorkingDirectory().toString()); + + try { + fileSystem.getFileStatus(new Path(workDir, YarnHighAvailabilityServices.FLINK_RECOVERY_DATA_DIR)); + fail("Flink recovery data directory still exists"); + } + catch (FileNotFoundException e) { + // expected, because the directory should have been cleaned up + } + + assertTrue(services.isClosed()); + + // doing another cleanup when the services are closed should fail + try { + services.closeAndCleanupAllData(); + fail("should fail with an IllegalStateException"); + } catch (IllegalStateException e) { + // expected + } + } + + @Test + public void testCallsOnClosedServices() throws Exception { + final Configuration flinkConfig = new Configuration(); + flinkConfig.setString(YarnConfigOptions.APP_MASTER_RPC_ADDRESS, "localhost"); + flinkConfig.setInteger(YarnConfigOptions.APP_MASTER_RPC_PORT, 1427); + + YarnHighAvailabilityServices services = new YarnPreConfiguredMasterNonHaServices(flinkConfig, hadoopConfig); + + // this method is not supported + try { + services.getSubmittedJobGraphStore(); + fail(); + } catch (UnsupportedOperationException ignored) {} + + + services.close(); + + // all these methods should fail now + + try { + services.createBlobStore(); + fail(); + } catch (IllegalStateException ignored) {} + + try { + services.getCheckpointRecoveryFactory(); + fail(); + } catch (IllegalStateException ignored) {} + + try { + services.getJobManagerLeaderElectionService(new JobID()); + fail(); + } catch (IllegalStateException ignored) {} + + try { + services.getJobManagerLeaderRetriever(new JobID()); + fail(); + } catch (IllegalStateException ignored) {} + + try { + services.getRunningJobsRegistry(); + fail(); + } catch (IllegalStateException ignored) {} + + try { + services.getResourceManagerLeaderElectionService(); + fail(); + } catch (IllegalStateException ignored) {} + + try { + services.getResourceManagerLeaderRetriever(); + fail(); + } catch (IllegalStateException ignored) {} + } +}
