[FLINK-5254] [yarn] Implement YARN High-Availability Services
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2a7dbda7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2a7dbda7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2a7dbda7 Branch: refs/heads/master Commit: 2a7dbda79a00863a511fcf64b339770d1d00f805 Parents: e2922ad Author: Stephan Ewen <[email protected]> Authored: Mon Dec 5 01:34:32 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:27 2016 +0100 ---------------------------------------------------------------------- .../flink/configuration/Configuration.java | 12 +- .../FsNegativeRunningJobsRegistryTest.java | 121 ++++++ .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 20 +- .../highavailability/EmbeddedNonHaServices.java | 15 +- .../FsNegativeRunningJobsRegistry.java | 153 ++++++++ .../HighAvailabilityServices.java | 69 +++- .../runtime/highavailability/NonHaServices.java | 21 +- .../highavailability/ServicesThreadFactory.java | 40 ++ .../highavailability/ZookeeperHaServices.java | 17 +- .../SingleLeaderElectionService.java | 384 +++++++++++++++++++ .../nonha/AbstractNonHaServices.java | 29 +- .../nonha/EmbeddedLeaderService.java | 2 +- .../runtime/jobmanager/slots/AllocatedSlot.java | 1 - .../StandaloneLeaderRetrievalService.java | 1 + .../flink/runtime/minicluster/MiniCluster.java | 2 +- .../resourcemanager/JobLeaderIdService.java | 2 +- .../resourcemanager/ResourceManagerRunner.java | 3 +- .../flink/runtime/rpc/RpcServiceUtils.java | 70 ++++ .../FsNegativeRunningJobsRegistryTest.java | 85 ++++ .../TestingHighAvailabilityServices.java | 14 +- .../SingleLeaderElectionServiceTest.java | 226 +++++++++++ flink-yarn/pom.xml | 21 + ...bstractYarnFlinkApplicationMasterRunner.java | 34 +- .../flink/yarn/YarnApplicationMasterRunner.java | 2 +- .../yarn/YarnFlinkApplicationMasterRunner.java | 18 +- .../flink/yarn/YarnTaskExecutorRunner.java | 13 +- .../yarn/configuration/YarnConfigOptions.java | 49 +++ .../AbstractYarnNonHaServices.java | 105 +++++ .../YarnHighAvailabilityServices.java | 343 +++++++++++++++++ .../YarnIntraNonHaMasterServices.java | 188 +++++++++ .../YarnPreConfiguredMasterNonHaServices.java | 172 +++++++++ .../YarnIntraNonHaMasterServicesTest.java | 149 +++++++ .../YarnPreConfiguredMasterHaServicesTest.java | 234 +++++++++++ 33 files changed, 2537 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index f15c669..8f23435 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -44,7 +44,7 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters implements IOReadableWritable, java.io.Serializable, Cloneable { private static final long serialVersionUID = 1L; - + private static final byte TYPE_STRING = 0; private static final byte TYPE_INT = 1; private static final byte TYPE_LONG = 2; @@ -52,14 +52,14 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters private static final byte TYPE_FLOAT = 4; private static final byte TYPE_DOUBLE = 5; private static final byte TYPE_BYTES = 6; - + /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(Configuration.class); - + /** Stores the concrete key/value pairs of this configuration object. */ protected final HashMap<String, Object> confData; - + // -------------------------------------------------------------------------------------------- /** @@ -639,12 +639,16 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters Object o = getRawValue(configOption.key()); if (o != null) { + // found a value for the current proper key return o; } else if (configOption.hasDeprecatedKeys()) { + // try the deprecated keys for (String deprecatedKey : configOption.deprecatedKeys()) { Object oo = getRawValue(deprecatedKey); if (oo != null) { + LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'", + deprecatedKey, configOption.key()); return oo; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java new file mode 100644 index 0000000..40d75e8 --- /dev/null +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FsNegativeRunningJobsRegistryTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hdfstests; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.highavailability.FsNegativeRunningJobsRegistry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FsNegativeRunningJobsRegistryTest { + + @ClassRule + public static final TemporaryFolder TEMP_DIR = new TemporaryFolder(); + + private static MiniDFSCluster HDFS_CLUSTER; + + private static Path HDFS_ROOT_PATH; + + // ------------------------------------------------------------------------ + // startup / shutdown + // ------------------------------------------------------------------------ + + @BeforeClass + public static void createHDFS() throws Exception { + final File tempDir = TEMP_DIR.newFolder(); + + Configuration hdConf = new Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + HDFS_CLUSTER = builder.build(); + + HDFS_ROOT_PATH = new Path("hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":" + + HDFS_CLUSTER.getNameNodePort() + "/"); + } + + @AfterClass + public static void destroyHDFS() { + if (HDFS_CLUSTER != null) { + HDFS_CLUSTER.shutdown(); + } + HDFS_CLUSTER = null; + HDFS_ROOT_PATH = null; + } + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + @Test + public void testCreateAndSetFinished() throws Exception { + final Path workDir = new Path(HDFS_ROOT_PATH, "test-work-dir"); + final JobID jid = new JobID(); + + FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir); + + // initially, without any call, the job is considered running + assertTrue(registry.isJobRunning(jid)); + + // repeated setting should not affect the status + registry.setJobRunning(jid); + assertTrue(registry.isJobRunning(jid)); + + // set the job to finished and validate + registry.setJobFinished(jid); + assertFalse(registry.isJobRunning(jid)); + + // another registry should pick this up + FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir); + assertFalse(otherRegistry.isJobRunning(jid)); + } + + @Test + public void testSetFinishedAndRunning() throws Exception { + final Path workDir = new Path(HDFS_ROOT_PATH, "änother_wörk_directörü"); + final JobID jid = new JobID(); + + FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(workDir); + + // set the job to finished and validate + registry.setJobFinished(jid); + assertFalse(registry.isJobRunning(jid)); + + // set the job to back to running and validate + registry.setJobRunning(jid); + assertTrue(registry.isJobRunning(jid)); + + // another registry should pick this up + FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(workDir); + assertTrue(otherRegistry.isJobRunning(jid)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 0eab032..36dfa55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -35,6 +35,8 @@ import java.lang.reflect.Method; import java.net.URI; import java.net.UnknownHostException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The * class is a wrapper class which encapsulated the original Hadoop HDFS API. @@ -60,7 +62,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst /** - * Creates a new DistributedFileSystem object to access HDFS + * Creates a new DistributedFileSystem object to access HDFS, based on a class name + * and picking up the configuration from the class path or the Flink configuration. * * @throws IOException * throw if the required HDFS classes cannot be instantiated @@ -76,6 +79,21 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst this.fs = instantiateFileSystem(fsClass); } + /** + * Creates a new DistributedFileSystem that uses the given Hadoop + * {@link org.apache.hadoop.fs.FileSystem} under the hood. + * + * @param hadoopConfig The Hadoop configuration that the FileSystem is based on. + * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood. + */ + public HadoopFileSystem( + org.apache.hadoop.conf.Configuration hadoopConfig, + org.apache.hadoop.fs.FileSystem hadoopFileSystem) { + + this.conf = checkNotNull(hadoopConfig, "hadoopConfig"); + this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem"); + } + private Class<? extends org.apache.hadoop.fs.FileSystem> getDefaultHDFSClass() throws IOException { Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null; http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java index b91cec1..a417599 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java @@ -43,6 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High // ------------------------------------------------------------------------ @Override + public String getResourceManagerEndpointName() { + // dynamic actor name + return null; + } + + @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { return resourceManagerLeaderService.createLeaderRetrievalService(); } @@ -55,11 +61,16 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High // ------------------------------------------------------------------------ @Override - public void shutdown() throws Exception { + public void close() throws Exception { try { - super.shutdown(); + super.close(); } finally { resourceManagerLeaderService.shutdown(); } } + + @Override + public void closeAndCleanupAllData() throws Exception { + close(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java new file mode 100644 index 0000000..9d8b226 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistry.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This {@link RunningJobsRegistry} tracks the status jobs via marker files, + * marking finished jobs via marker files. + * + * <p>The general contract is the following: + * <ul> + * <li>Initially, a marker file does not exist (no one created it, yet), which means + * the specific job is assumed to be running</li> + * <li>The JobManager that finishes calls this service to create the marker file, + * which marks the job as finished.</li> + * <li>If a JobManager gains leadership at some point when shutdown is in progress, + * it will see the marker file and realize that the job is finished.</li> + * <li>The application framework is expected to clean the file once the application + * is completely shut down. At that point, no JobManager will attempt to + * start the job, even if it gains leadership.</li> + * </ul> + * + * <p>It is especially tailored towards deployment modes like for example + * YARN, where HDFS is available as a persistent file system, and the YARN + * application's working directories on HDFS are automatically cleaned + * up after the application completed. + */ +public class FsNegativeRunningJobsRegistry implements RunningJobsRegistry { + + private static final String PREFIX = ".job_complete_"; + + private final FileSystem fileSystem; + + private final Path basePath; + + /** + * Creates a new registry that writes to the FileSystem and working directory + * denoted by the given path. + * + * <p>The initialization will attempt to write to the given working directory, in + * order to catch setup/configuration errors early. + * + * @param workingDirectory The working directory for files to track the job status. + * + * @throws IOException Thrown, if the specified directory cannot be accessed. + */ + public FsNegativeRunningJobsRegistry(Path workingDirectory) throws IOException { + this(workingDirectory.getFileSystem(), workingDirectory); + } + + /** + * Creates a new registry that writes its files to the given FileSystem at + * the given working directory path. + * + * <p>The initialization will attempt to write to the given working directory, in + * order to catch setup/configuration errors early. + * + * @param fileSystem The FileSystem to use for the marker files. + * @param workingDirectory The working directory for files to track the job status. + * + * @throws IOException Thrown, if the specified directory cannot be accessed. + */ + public FsNegativeRunningJobsRegistry(FileSystem fileSystem, Path workingDirectory) throws IOException { + this.fileSystem = checkNotNull(fileSystem, "fileSystem"); + this.basePath = checkNotNull(workingDirectory, "workingDirectory"); + + // to be safe, attempt to write to the working directory, to + // catch problems early + final Path testFile = new Path(workingDirectory, ".registry_test"); + try (FSDataOutputStream out = fileSystem.create(testFile, false)) { + out.write(42); + } + catch (IOException e) { + throw new IOException("Unable to write to working directory: " + workingDirectory, e); + } + finally { + fileSystem.delete(testFile, false); + } + } + + // ------------------------------------------------------------------------ + + @Override + public void setJobRunning(JobID jobID) throws IOException { + checkNotNull(jobID, "jobID"); + final Path filePath = createMarkerFilePath(jobID); + + // delete the marker file, if it exists + try { + fileSystem.delete(filePath, false); + } + catch (FileNotFoundException e) { + // apparently job was already considered running + } + } + + @Override + public void setJobFinished(JobID jobID) throws IOException { + checkNotNull(jobID, "jobID"); + final Path filePath = createMarkerFilePath(jobID); + + // create the file + // to avoid an exception if the job already exists, set overwrite=true + try (FSDataOutputStream out = fileSystem.create(filePath, true)) { + out.write(42); + } + } + + @Override + public boolean isJobRunning(JobID jobID) throws IOException { + checkNotNull(jobID, "jobID"); + + // check for the existence of the file + try { + fileSystem.getFileStatus(createMarkerFilePath(jobID)); + // file was found --> job is terminated + return false; + } + catch (FileNotFoundException e) { + // file does not exist, job is still running + return true; + } + } + + private Path createMarkerFilePath(JobID jobId) { + return new Path(basePath, PREFIX + jobId.toString()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 360de7b..4169204 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -26,19 +26,45 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import java.io.IOException; +import java.util.UUID; /** - * This class gives access to all services needed for - * + * The HighAvailabilityServices give access to all services needed for a highly-available + * setup. In particular, the services provide access to highly available storage and + * registries, as well as distributed counters and leader election. + * * <ul> * <li>ResourceManager leader election and leader retrieval</li> * <li>JobManager leader election and leader retrieval</li> * <li>Persistence for checkpoint metadata</li> * <li>Registering the latest completed checkpoint(s)</li> - * <li>Persistence for submitted job graph</li> + * <li>Persistence for the BLOB store</li> + * <li>Registry that marks a job's status</li> + * <li>Naming of RPC endpoints</li> * </ul> */ -public interface HighAvailabilityServices { +public interface HighAvailabilityServices extends AutoCloseable { + + // ------------------------------------------------------------------------ + // Constants + // ------------------------------------------------------------------------ + + /** + * This UUID should be used when no proper leader election happens, but a simple + * pre-configured leader is used. That is for example the case in non-highly-available + * standalone setups. + */ + UUID DEFAULT_LEADER_ID = new UUID(0, 0); + + // ------------------------------------------------------------------------ + // Endpoint Naming + // ------------------------------------------------------------------------ + + String getResourceManagerEndpointName(); + + // ------------------------------------------------------------------------ + // Services + // ------------------------------------------------------------------------ /** * Gets the leader retriever for the cluster's resource manager. @@ -88,7 +114,7 @@ public interface HighAvailabilityServices { * * @return Running job registry to retrieve running jobs */ - RunningJobsRegistry getRunningJobsRegistry(); + RunningJobsRegistry getRunningJobsRegistry() throws Exception; /** * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. @@ -99,11 +125,38 @@ public interface HighAvailabilityServices { BlobStore createBlobStore() throws IOException; // ------------------------------------------------------------------------ + // Shutdown and Cleanup + // ------------------------------------------------------------------------ /** - * Shut the high availability service down. + * Closes the high availability services, releasing all resources. + * + * <p>This method <b>does not delete or clean up</b> any data stored in external stores + * (file systems, ZooKeeper, etc). Another instance of the high availability + * services will be able to recover the job. + * + * <p>If an exception occurs during closing services, this method will attempt to + * continue closing other services and report exceptions only after all services + * have been attempted to be closed. * - * @throws Exception if the shut down fails + * @throws Exception Thrown, if an exception occurred while closing these services. + */ + @Override + void close() throws Exception; + + /** + * Closes the high availability services (releasing all resources) and deletes + * all data stored by these services in external stores. + * + * <p>After this method was called, the any job or session that was managed by + * these high availability services will be unrecoverable. + * + * <p>If an exception occurs during cleanup, this method will attempt to + * continue the cleanup and report exceptions only after all cleanup steps have + * been attempted. + * + * @throws Exception Thrown, if an exception occurred while closing these services + * or cleaning up data stored by them. */ - void shutdown() throws Exception; + void closeAndCleanupAllData() throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 75f44ed..d644fb9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -18,14 +18,12 @@ package org.apache.flink.runtime.highavailability; +import org.apache.flink.runtime.highavailability.leaderelection.SingleLeaderElectionService; import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices; import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; -import java.util.UUID; - import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -39,6 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class NonHaServices extends AbstractNonHaServices implements HighAvailabilityServices { + /** The constant name of the ResourceManager RPC endpoint */ + private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager"; + /** The fix address of the ResourceManager */ private final String resourceManagerAddress; @@ -53,16 +54,26 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi } // ------------------------------------------------------------------------ + // Names + // ------------------------------------------------------------------------ + + @Override + public String getResourceManagerEndpointName() { + return RESOURCE_MANAGER_RPC_ENDPOINT_NAME; + } + + + // ------------------------------------------------------------------------ // Services // ------------------------------------------------------------------------ @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { - return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0)); + return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { - return new StandaloneLeaderElectionService(); + return new SingleLeaderElectionService(getExecutorService(), DEFAULT_LEADER_ID); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java new file mode 100644 index 0000000..24667e4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ServicesThreadFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import javax.annotation.Nonnull; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class ServicesThreadFactory implements ThreadFactory { + + private AtomicInteger enumerator = new AtomicInteger(); + + @Override + public Thread newThread(@Nonnull Runnable r) { + Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet()); + + // HA threads should have a very high priority, but not + // keep the JVM running by themselves + thread.setPriority(Thread.MAX_PRIORITY); + thread.setDaemon(true); + + return thread; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index 3e909e8..25d21ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -108,6 +108,16 @@ public class ZookeeperHaServices implements HighAvailabilityServices { // ------------------------------------------------------------------------ @Override + public String getResourceManagerEndpointName() { + // since the resource manager name must be dynamic, we return null here + return null; + } + + // ------------------------------------------------------------------------ + // Services + // ------------------------------------------------------------------------ + + @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH); } @@ -174,10 +184,15 @@ public class ZookeeperHaServices implements HighAvailabilityServices { // ------------------------------------------------------------------------ @Override - public void shutdown() throws Exception { + public void close() throws Exception { client.close(); } + @Override + public void closeAndCleanupAllData() throws Exception { + close(); + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java new file mode 100644 index 0000000..26e3cbf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.leaderelection; + +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import java.util.HashSet; +import java.util.UUID; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * An implementation of the {@link LeaderElectionService} interface that handles a single + * leader contender. When started, this service immediately grants the contender the leadership. + * + * <p>The implementation accepts a single static leader session ID and is hence compatible with + * pre-configured single leader (no leader failover) setups. + * + * <p>This implementation supports a series of leader listeners that receive notifications about + * the leader contender. + */ +public class SingleLeaderElectionService implements LeaderElectionService { + + private static final Logger LOG = LoggerFactory.getLogger(SingleLeaderElectionService.class); + + // ------------------------------------------------------------------------ + + /** lock for all operations on this instance */ + private final Object lock = new Object(); + + /** The executor service that dispatches notifications */ + private final Executor notificationExecutor; + + /** The leader ID assigned to the immediate leader */ + private final UUID leaderId; + + @GuardedBy("lock") + private final HashSet<EmbeddedLeaderRetrievalService> listeners; + + /** The currently proposed leader */ + @GuardedBy("lock") + private volatile LeaderContender proposedLeader; + + /** The confirmed leader */ + @GuardedBy("lock") + private volatile LeaderContender leader; + + /** The address of the confirmed leader */ + @GuardedBy("lock") + private volatile String leaderAddress; + + /** Flag marking this service as shutdown, meaning it cannot be started again */ + @GuardedBy("lock") + private volatile boolean shutdown; + + // ------------------------------------------------------------------------ + + /** + * Creates a new leader election service. The service assigns the given leader ID + * to the leader contender. + * + * @param leaderId The constant leader ID assigned to the leader. + */ + public SingleLeaderElectionService(Executor notificationsDispatcher, UUID leaderId) { + this.notificationExecutor = checkNotNull(notificationsDispatcher); + this.leaderId = checkNotNull(leaderId); + this.listeners = new HashSet<>(); + } + + // ------------------------------------------------------------------------ + // leader election service + // ------------------------------------------------------------------------ + + @Override + public void start(LeaderContender contender) throws Exception { + checkNotNull(contender, "contender"); + + synchronized (lock) { + checkState(!shutdown, "service is shut down"); + checkState(proposedLeader == null, "service already started"); + + // directly grant leadership to the given contender + proposedLeader = contender; + notificationExecutor.execute(new GrantLeadershipCall(contender, leaderId)); + } + } + + @Override + public void stop() { + synchronized (lock) { + // notify all listeners that there is no leader + for (EmbeddedLeaderRetrievalService listener : listeners) { + notificationExecutor.execute( + new NotifyOfLeaderCall(null, null, listener.listener, LOG)); + } + + // if there was a leader, revoke its leadership + if (leader != null) { + try { + leader.revokeLeadership(); + } catch (Throwable t) { + leader.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); + } + } + + proposedLeader = null; + leader = null; + leaderAddress = null; + } + } + + @Override + public void confirmLeaderSessionID(UUID leaderSessionID) { + checkNotNull(leaderSessionID, "leaderSessionID"); + checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id"); + + synchronized (lock) { + checkState(!shutdown, "service is shut down"); + checkState(proposedLeader != null, "no leader proposed yet"); + checkState(leader == null, "leader already confirmed"); + + // accept the confirmation + final String address = proposedLeader.getAddress(); + leaderAddress = address; + leader = proposedLeader; + + // notify all listeners + for (EmbeddedLeaderRetrievalService listener : listeners) { + notificationExecutor.execute( + new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG)); + } + } + } + + @Override + public boolean hasLeadership() { + synchronized (lock) { + return leader != null; + } + } + + void errorOnGrantLeadership(LeaderContender contender, Throwable error) { + LOG.warn("Error notifying leader listener about new leader", error); + contender.handleError(error instanceof Exception ? (Exception) error : new Exception(error)); + + synchronized (lock) { + if (proposedLeader == contender) { + proposedLeader = null; + leader = null; + } + } + } + + // ------------------------------------------------------------------------ + // shutdown + // ------------------------------------------------------------------------ + + public boolean isShutdown() { + return shutdown; + } + + public void shutdown() { + shutdownInternally(new Exception("The leader service is shutting down")); + } + + private void shutdownInternally(Exception exceptionForHandlers) { + synchronized (lock) { + if (shutdown) { + return; + } + + shutdown = true; + + // fail the leader (if there is one) + if (leader != null) { + try { + leader.handleError(exceptionForHandlers); + } catch (Throwable ignored) {} + } + + // clear all leader status + leader = null; + proposedLeader = null; + leaderAddress = null; + + // fail all registered listeners + for (EmbeddedLeaderRetrievalService service : listeners) { + service.shutdown(exceptionForHandlers); + } + listeners.clear(); + } + } + + private void fatalError(Throwable error) { + LOG.error("Embedded leader election service encountered a fatal error. Shutting down service.", error); + + shutdownInternally(new Exception("Leader election service is shutting down after a fatal error", error)); + } + + // ------------------------------------------------------------------------ + // leader listeners + // ------------------------------------------------------------------------ + + public LeaderRetrievalService createLeaderRetrievalService() { + checkState(!shutdown, "leader election service is shut down"); + return new EmbeddedLeaderRetrievalService(); + } + + void addListener(EmbeddedLeaderRetrievalService service, LeaderRetrievalListener listener) { + synchronized (lock) { + checkState(!shutdown, "leader election service is shut down"); + checkState(!service.running, "leader retrieval service is already started"); + + try { + if (!listeners.add(service)) { + throw new IllegalStateException("leader retrieval service was added to this service multiple times"); + } + + service.listener = listener; + service.running = true; + + // if we already have a leader, immediately notify this new listener + if (leader != null) { + notificationExecutor.execute( + new NotifyOfLeaderCall(leaderAddress, leaderId, listener, LOG)); + } + } + catch (Throwable t) { + fatalError(t); + } + } + } + + void removeListener(EmbeddedLeaderRetrievalService service) { + synchronized (lock) { + // if the service was not even started, simply do nothing + if (!service.running || shutdown) { + return; + } + + try { + if (!listeners.remove(service)) { + throw new IllegalStateException("leader retrieval service does not belong to this service"); + } + + // stop the service + service.listener = null; + service.running = false; + } + catch (Throwable t) { + fatalError(t); + } + } + } + + // ------------------------------------------------------------------------ + + private class EmbeddedLeaderRetrievalService implements LeaderRetrievalService { + + volatile LeaderRetrievalListener listener; + + volatile boolean running; + + @Override + public void start(LeaderRetrievalListener listener) throws Exception { + checkNotNull(listener); + addListener(this, listener); + } + + @Override + public void stop() throws Exception { + removeListener(this); + } + + void shutdown(Exception cause) { + if (running) { + final LeaderRetrievalListener lst = listener; + running = false; + listener = null; + + try { + lst.handleError(cause); + } catch (Throwable ignored) {} + } + } + } + + // ------------------------------------------------------------------------ + // asynchronous notifications + // ------------------------------------------------------------------------ + + /** + * This runnable informs a leader contender that it gained leadership. + */ + private class GrantLeadershipCall implements Runnable { + + private final LeaderContender contender; + private final UUID leaderSessionId; + + GrantLeadershipCall(LeaderContender contender, UUID leaderSessionId) { + + this.contender = checkNotNull(contender); + this.leaderSessionId = checkNotNull(leaderSessionId); + } + + @Override + public void run() { + try { + contender.grantLeadership(leaderSessionId); + } + catch (Throwable t) { + errorOnGrantLeadership(contender, t); + } + } + } + + // ------------------------------------------------------------------------ + + /** + * This runnable informs a leader listener of a new leader + */ + private static class NotifyOfLeaderCall implements Runnable { + + @Nullable + private final String address; // null if leader revoked without new leader + @Nullable + private final UUID leaderSessionId; // null if leader revoked without new leader + + private final LeaderRetrievalListener listener; + private final Logger logger; + + NotifyOfLeaderCall( + @Nullable String address, + @Nullable UUID leaderSessionId, + LeaderRetrievalListener listener, + Logger logger) { + + this.address = address; + this.leaderSessionId = leaderSessionId; + this.listener = checkNotNull(listener); + this.logger = checkNotNull(logger); + } + + @Override + public void run() { + try { + listener.notifyLeaderAddress(address, leaderSessionId); + } + catch (Throwable t) { + logger.warn("Error notifying leader listener about new leader", t); + listener.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index 474faa8..b10e414 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -25,19 +25,17 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; +import org.apache.flink.runtime.highavailability.ServicesThreadFactory; import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -132,7 +130,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices // ------------------------------------------------------------------------ @Override - public void shutdown() throws Exception { + public void close() throws Exception { synchronized (lock) { if (!shutdown) { shutdown = true; @@ -149,6 +147,12 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices } } + @Override + public void closeAndCleanupAllData() throws Exception { + // this stores no data, so this method is the same as 'close()' + close(); + } + private void checkNotShutdown() { checkState(!shutdown, "high availability services are shut down"); } @@ -160,21 +164,4 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices protected ExecutorService getExecutorService() { return executor; } - - private static final class ServicesThreadFactory implements ThreadFactory { - - private AtomicInteger enumerator = new AtomicInteger(); - - @Override - public Thread newThread(@Nonnull Runnable r) { - Thread thread = new Thread(r, "Flink HA Services Thread #" + enumerator.incrementAndGet()); - - // HA threads should have a very high priority, but not - // keep the JVM running by themselves - thread.setPriority(Thread.MAX_PRIORITY); - thread.setDaemon(true); - - return thread; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java index 9fad9be..d4eba26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/EmbeddedLeaderService.java @@ -461,7 +461,7 @@ public class EmbeddedLeaderService { contender.grantLeadership(leaderSessionId); } catch (Throwable t) { - logger.warn("Error notifying leader listener about new leader", t); + logger.warn("Error granting leadership to contender", t); contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java index a0c608d..269a8f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import static org.apache.flink.util.Preconditions.checkNotNull; http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java index 16b163c..4ad4646 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java @@ -51,6 +51,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService * * @param leaderAddress The leader's pre-configured address */ + @Deprecated public StandaloneLeaderRetrievalService(String leaderAddress) { this.leaderAddress = checkNotNull(leaderAddress); this.leaderId = null; http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 29a6e59..1933554 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -345,7 +345,7 @@ public class MiniCluster { // shut down high-availability services if (haServices != null) { try { - haServices.shutdown(); + haServices.closeAndCleanupAllData(); } catch (Exception e) { exception = firstOrSuppressed(e, exception); } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java index 56e72c0..6c7e249 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java @@ -59,7 +59,7 @@ public class JobLeaderIdService { /** Actions to call when the job leader changes */ private JobLeaderIdActions jobLeaderIdActions; - public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) { + public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception { this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index 959b727..e0dee0b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -46,7 +45,7 @@ public class ResourceManagerRunner implements FatalErrorHandler { final Configuration configuration, final RpcService rpcService, final HighAvailabilityServices highAvailabilityServices, - final MetricRegistry metricRegistry) throws ConfigurationException { + final MetricRegistry metricRegistry) throws Exception { Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(rpcService); http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java index 1ac54ac..018c3ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java @@ -21,19 +21,36 @@ package org.apache.flink.runtime.rpc; import akka.actor.ActorSystem; import com.typesafe.config.Config; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.NetUtils; + import org.jboss.netty.channel.ChannelException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.UnknownHostException; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * These RPC utilities contain helper methods around RPC use, such as starting an RPC service, + * or constructing RPC addresses. + */ public class RpcServiceUtils { + private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class); + // ------------------------------------------------------------------------ + // RPC instantiation + // ------------------------------------------------------------------------ + /** * Utility method to create RPC service from configuration and hostname, port. * @@ -78,4 +95,57 @@ public class RpcServiceUtils { final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); return new AkkaRpcService(actorSystem, timeout); } + + // ------------------------------------------------------------------------ + // RPC endpoint addressing + // ------------------------------------------------------------------------ + + /** + * + * @param hostname The hostname or address where the target RPC service is listening. + * @param port The port where the target RPC service is listening. + * @param endpointName The name of the RPC endpoint. + * @param config Teh configuration from which to deduce further settings. + * + * @return The RPC URL of the specified RPC endpoint. + */ + public static String getRpcUrl(String hostname, int port, String endpointName, Configuration config) + throws UnknownHostException { + + checkNotNull(config, "config is null"); + + final boolean sslEnabled = config.getBoolean( + ConfigConstants.AKKA_SSL_ENABLED, + ConfigConstants.DEFAULT_AKKA_SSL_ENABLED) && + SSLUtils.getSSLEnabled(config); + + return getRpcUrl(hostname, port, endpointName, sslEnabled); + } + + /** + * + * @param hostname The hostname or address where the target RPC service is listening. + * @param port The port where the target RPC service is listening. + * @param endpointName The name of the RPC endpoint. + * @param secure True, if security/encryption is enabled, false otherwise. + * + * @return The RPC URL of the specified RPC endpoint. + */ + public static String getRpcUrl(String hostname, int port, String endpointName, boolean secure) + throws UnknownHostException { + + checkNotNull(hostname, "hostname is null"); + checkNotNull(endpointName, "endpointName is null"); + checkArgument(port > 0 && port <= 65535, "port must be in [1, 65535]"); + + final String protocol = secure ? "akka.ssl.tcp" : "akka.tcp"; + final String hostPort = NetUtils.hostAndPortToUrlString(hostname, port); + + return String.format("%s://flink@%s/user/%s", protocol, hostPort, endpointName); + } + + // ------------------------------------------------------------------------ + + /** This class is not meant to be instantiated */ + private RpcServiceUtils() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java new file mode 100644 index 0000000..f1ece0e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FsNegativeRunningJobsRegistryTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FsNegativeRunningJobsRegistryTest extends TestLogger { + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testCreateAndSetFinished() throws Exception { + final File folder = tempFolder.newFolder(); + final String uri = folder.toURI().toString(); + + final JobID jid = new JobID(); + + FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri)); + + // initially, without any call, the job is considered running + assertTrue(registry.isJobRunning(jid)); + + // repeated setting should not affect the status + registry.setJobRunning(jid); + assertTrue(registry.isJobRunning(jid)); + + // set the job to finished and validate + registry.setJobFinished(jid); + assertFalse(registry.isJobRunning(jid)); + + // another registry should pick this up + FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri)); + assertFalse(otherRegistry.isJobRunning(jid)); + } + + @Test + public void testSetFinishedAndRunning() throws Exception { + final File folder = tempFolder.newFolder(); + final String uri = folder.toURI().toString(); + + final JobID jid = new JobID(); + + FsNegativeRunningJobsRegistry registry = new FsNegativeRunningJobsRegistry(new Path(uri)); + + // set the job to finished and validate + registry.setJobFinished(jid); + assertFalse(registry.isJobRunning(jid)); + + // set the job to back to running and validate + registry.setJobRunning(jid); + assertTrue(registry.isJobRunning(jid)); + + // another registry should pick this up + FsNegativeRunningJobsRegistry otherRegistry = new FsNegativeRunningJobsRegistry(new Path(uri)); + assertTrue(otherRegistry.isJobRunning(jid)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index e0f71ee..3f9865c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -155,12 +155,22 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices return new VoidBlobStore(); } + @Override + public String getResourceManagerEndpointName() { + throw new UnsupportedOperationException(); + } + // ------------------------------------------------------------------------ // Shutdown // ------------------------------------------------------------------------ @Override - public void shutdown() throws Exception { - // nothing to do, since this should not shut down individual services, but cross service parts + public void close() throws Exception { + // nothing to do + } + + @Override + public void closeAndCleanupAllData() throws Exception { + // nothing to do } } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java new file mode 100644 index 0000000..a9805a1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionServiceTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.leaderelection; + +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.util.StringUtils; + +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Executor; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.*; + +/** + * Tests for the {@link SingleLeaderElectionService}. + */ +public class SingleLeaderElectionServiceTest { + + private static final Random RND = new Random(); + + private final Executor executor = Executors.directExecutor(); + + // ------------------------------------------------------------------------ + + @Test + public void testStartStopAssignLeadership() throws Exception { + final UUID uuid = UUID.randomUUID(); + final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid); + + final LeaderContender contender = mockContender(service); + final LeaderContender otherContender = mockContender(service); + + service.start(contender); + verify(contender, times(1)).grantLeadership(uuid); + + service.stop(); + verify(contender, times(1)).revokeLeadership(); + + // start with a new contender - the old contender must not gain another leadership + service.start(otherContender); + verify(otherContender, times(1)).grantLeadership(uuid); + + verify(contender, times(1)).grantLeadership(uuid); + verify(contender, times(1)).revokeLeadership(); + } + + @Test + public void testStopBeforeConfirmingLeadership() throws Exception { + final UUID uuid = UUID.randomUUID(); + final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid); + + final LeaderContender contender = mock(LeaderContender.class); + + service.start(contender); + verify(contender, times(1)).grantLeadership(uuid); + + service.stop(); + + // because the leadership was never confirmed, there is no "revoke" call + verifyNoMoreInteractions(contender); + } + + @Test + public void testStartOnlyOnce() throws Exception { + final UUID uuid = UUID.randomUUID(); + final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid); + + final LeaderContender contender = mock(LeaderContender.class); + final LeaderContender otherContender = mock(LeaderContender.class); + + service.start(contender); + verify(contender, times(1)).grantLeadership(uuid); + + // should not be possible to start again this with another contender + try { + service.start(otherContender); + fail("should fail with an exception"); + } catch (IllegalStateException e) { + // expected + } + + // should not be possible to start this again with the same contender + try { + service.start(contender); + fail("should fail with an exception"); + } catch (IllegalStateException e) { + // expected + } + } + + @Test + public void testShutdown() throws Exception { + final UUID uuid = UUID.randomUUID(); + final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid); + + // create a leader contender and let it grab leadership + final LeaderContender contender = mockContender(service); + service.start(contender); + verify(contender, times(1)).grantLeadership(uuid); + + // some leader listeners + final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class); + final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class); + + LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService(); + LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService(); + + listenerService1.start(listener1); + listenerService2.start(listener2); + + // one listener stops + listenerService1.stop(); + + // shut down the service + service.shutdown(); + + // the leader contender and running listener should get error notifications + verify(contender, times(1)).handleError(any(Exception.class)); + verify(listener2, times(1)).handleError(any(Exception.class)); + + // the stopped listener gets no notification + verify(listener1, times(0)).handleError(any(Exception.class)); + + // should not be possible to start again after shutdown + try { + service.start(contender); + fail("should fail with an exception"); + } catch (IllegalStateException e) { + // expected + } + + // no additional leadership grant + verify(contender, times(1)).grantLeadership(any(UUID.class)); + } + + @Test + public void testImmediateShutdown() throws Exception { + final UUID uuid = UUID.randomUUID(); + final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid); + service.shutdown(); + + final LeaderContender contender = mock(LeaderContender.class); + + // should not be possible to start + try { + service.start(contender); + fail("should fail with an exception"); + } catch (IllegalStateException e) { + // expected + } + + // no additional leadership grant + verify(contender, times(0)).grantLeadership(any(UUID.class)); + } + +// @Test +// public void testNotifyListenersWhenLeaderElected() throws Exception { +// final UUID uuid = UUID.randomUUID(); +// final SingleLeaderElectionService service = new SingleLeaderElectionService(executor, uuid); +// +// final LeaderRetrievalListener listener1 = mock(LeaderRetrievalListener.class); +// final LeaderRetrievalListener listener2 = mock(LeaderRetrievalListener.class); +// +// LeaderRetrievalService listenerService1 = service.createLeaderRetrievalService(); +// LeaderRetrievalService listenerService2 = service.createLeaderRetrievalService(); +// +// listenerService1.start(listener1); +// listenerService1.start(listener2); +// +// final LeaderContender contender = mockContender(service); +// service.start(contender); +// +// veri +// } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private static LeaderContender mockContender(final LeaderElectionService service) { + String address = StringUtils.getRandomString(RND, 5, 10, 'a', 'z'); + return mockContender(service, address); + } + + private static LeaderContender mockContender(final LeaderElectionService service, final String address) { + LeaderContender mockContender = mock(LeaderContender.class); + + when(mockContender.getAddress()).thenReturn(address); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final UUID uuid = (UUID) invocation.getArguments()[0]; + service.confirmLeaderSessionID(uuid); + return null; + } + }).when(mockContender).grantLeadership(any(UUID.class)); + + return mockContender; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn/pom.xml b/flink-yarn/pom.xml index 18ffe13..f69e0e4 100644 --- a/flink-yarn/pom.xml +++ b/flink-yarn/pom.xml @@ -32,6 +32,9 @@ under the License. <packaging>jar</packaging> <dependencies> + + <!-- core dependencies --> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_2.10</artifactId> @@ -91,6 +94,8 @@ under the License. <scope>test</scope> </dependency> + <!-- test dependencies --> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_2.10</artifactId> @@ -98,6 +103,22 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>test</scope> + <type>test-jar</type> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + <version>${hadoop.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java index 923694e..1c8bad7 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java @@ -23,17 +23,19 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.clusterframework.BootstrapTools; -import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.util.Map; +import java.util.concurrent.Callable; /** * This class is the executable entry point for the YARN application master. @@ -95,7 +97,18 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner { LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", currentUser.getShortUserName(), yarnClientUsername ); - SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + // Flink configuration + final Map<String, String> dynamicProperties = + FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("YARN dynamic properties: {}", dynamicProperties); + + final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); + if(keytabPath != null && remoteKeytabPrincipal != null) { + flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); + flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); + } + + SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(flinkConfig); //To support Yarn Secure Integration Test Scenario File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); @@ -108,18 +121,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner { sc.setHadoopConfiguration(conf); } - // Flink configuration - final Map<String, String> dynamicProperties = - FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); - LOG.debug("YARN dynamic properties: {}", dynamicProperties); - - final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); - if(keytabPath != null && remoteKeytabPrincipal != null) { - flinkConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytabPath); - flinkConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); - } - - SecurityContext.install(sc.setFlinkConfiguration(flinkConfig)); + SecurityUtils.install(sc); // Note that we use the "appMasterHostname" given by YARN here, to make sure // we use the hostnames given by YARN consistently throughout akka. @@ -129,9 +131,9 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner { "ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key()); LOG.info("YARN assigned hostname for application master: {}", appMasterHostname); - return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { @Override - public Integer run() { + public Integer call() throws Exception { return runApplicationMaster(flinkConfig); } }); http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 1826d43..d1ef553 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -115,7 +115,7 @@ public class YarnApplicationMasterRunner { * @param args The command line arguments. */ public static void main(String[] args) { - EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args); + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java index e58c77e..188d9ef 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; -import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; import org.apache.flink.runtime.rpc.FatalErrorHandler; @@ -57,7 +56,12 @@ import java.io.FileInputStream; import java.io.ObjectInputStream; /** - * This class is the executable entry point for the YARN application master. + * This class is the executable entry point for the YARN Application Master that + * executes a single Flink job and then shuts the YARN application down. + * + * <p>The lifetime of the YARN application bound to that of the Flink job. Other + * YARN Application Master implementations are for example the YARN session. + * * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobManagerRunner} * and {@link org.apache.flink.yarn.YarnResourceManager}. * @@ -74,6 +78,8 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati /** The job graph file path */ private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + // ------------------------------------------------------------------------ + /** The lock to guard startup / shutdown / manipulation methods */ private final Object lock = new Object(); @@ -105,7 +111,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati * @param args The command line arguments. */ public static void main(String[] args) { - EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args); + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / ResourceManager / JobManager", args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); @@ -127,7 +133,9 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT); synchronized (lock) { + LOG.info("Starting High Availability Services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config); + metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); commonRpcService = createRpcService(config, appMasterHostname, amPortRange); @@ -176,7 +184,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); } - private ResourceManager createResourceManager(Configuration config) throws ConfigurationException { + private ResourceManager<?> createResourceManager(Configuration config) throws Exception { final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(config); final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory(); final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(haServices); @@ -242,7 +250,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati } if (haServices != null) { try { - haServices.shutdown(); + haServices.close(); } catch (Throwable tt) { LOG.warn("Failed to stop the HA service", tt); } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index d9912eb..dc8c604 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -28,7 +28,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.util.Map; +import java.util.concurrent.Callable; /** * This class is the executable entry point for running a TaskExecutor in a YARN container. @@ -138,7 +139,7 @@ public class YarnTaskExecutorRunner { LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", currentUser.getShortUserName(), yarnClientUsername); - SecurityContext.SecurityConfiguration sc = new SecurityContext.SecurityConfiguration(); + SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration); //To support Yarn Secure Integration Test Scenario File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME); @@ -156,11 +157,11 @@ public class YarnTaskExecutorRunner { configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, remoteKeytabPrincipal); } - SecurityContext.install(sc.setFlinkConfiguration(configuration)); + SecurityUtils.install(sc); - return SecurityContext.getInstalled().runSecured(new SecurityContext.FlinkSecuredRunner<Integer>() { + return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { @Override - public Integer run() { + public Integer call() throws Exception { return runTaskExecutor(configuration); } }); @@ -240,7 +241,7 @@ public class YarnTaskExecutorRunner { } if (haServices != null) { try { - haServices.shutdown(); + haServices.close(); } catch (Throwable tt) { LOG.warn("Failed to stop the HA service", tt); } http://git-wip-us.apache.org/repos/asf/flink/blob/2a7dbda7/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java new file mode 100644 index 0000000..c3902d3 --- /dev/null +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.configuration; + +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * This class holds configuration constants used by Flink's YARN runners. + * These options are not expected to be ever configured by users explicitly. + */ +public class YarnConfigOptions { + + /** + * The hostname or address where the application master RPC system is listening. + */ + public static final ConfigOption<String> APP_MASTER_RPC_ADDRESS = + key("yarn.appmaster.rpc.address") + .noDefaultValue(); + + /** + * The port where the application master RPC system is listening. + */ + public static final ConfigOption<Integer> APP_MASTER_RPC_PORT = + key("yarn.appmaster.rpc.address") + .defaultValue(-1); + + // ------------------------------------------------------------------------ + + /** This class is not meant to be instantiated */ + private YarnConfigOptions() {} +}
