[FLINK-4836] [cluster management] Add flink mini cluster (part 1) This implements - mini cluster configuration - startup / shutdown of common services (rpc, ha) - startup / shutdown of JobManager and Dispatcher
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/106cb9e3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/106cb9e3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/106cb9e3 Branch: refs/heads/master Commit: 106cb9e3ddb8ad019db99bc746e77c2ef48cb5e4 Parents: 9615f15 Author: Stephan Ewen <[email protected]> Authored: Sat Oct 15 00:25:41 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:23 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/util/ExceptionUtils.java | 12 +- .../HighAvailabilityServicesUtils.java | 17 + .../highavailability/ZookeeperHaServices.java | 2 +- .../runtime/jobmaster/JobManagerRunner.java | 1 - .../jobmaster/MiniClusterJobDispatcher.java | 394 ----------------- .../flink/runtime/minicluster/MiniCluster.java | 406 ++++++++++++++++++ .../minicluster/MiniClusterConfiguration.java | 147 +++++++ .../minicluster/MiniClusterJobDispatcher.java | 418 +++++++++++++++++++ .../resourcemanager/ResourceManager.java | 2 +- .../runtime/taskexecutor/JobLeaderService.java | 3 +- .../TestingHighAvailabilityServices.java | 2 +- .../runtime/minicluster/MiniClusterITCase.java | 79 ++++ 12 files changed, 1075 insertions(+), 408 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index d1357a8..1069f2d 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -110,8 +110,6 @@ public final class ExceptionUtils { } } - - /** * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions @@ -214,10 +212,8 @@ public final class ExceptionUtils { } } - /** - * Private constructor to prevent instantiation. - */ - private ExceptionUtils() { - throw new RuntimeException(); - } + // ------------------------------------------------------------------------ + + /** Private constructor to prevent instantiation. */ + private ExceptionUtils() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index f3da847..9113309 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -24,6 +24,23 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; public class HighAvailabilityServicesUtils { + public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config) throws Exception { + HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config); + + switch (highAvailabilityMode) { + case NONE: + return new EmbeddedNonHaServices(); + + case ZOOKEEPER: + throw new UnsupportedOperationException("ZooKeeper high availability services " + + "have not been implemented yet."); + + default: + throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); + } + } + + public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception { HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration); http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java index be19c60..e38840b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java @@ -113,7 +113,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices { } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID)); } http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 74ca6f3..3313d8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -289,7 +289,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F @Override public void jobFinishedByOther() { try { - unregisterJobFromHighAvailability(); shutdownInternally(); } finally { http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java deleted file mode 100644 index 019ccfe..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobmaster; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmanager.OnCompletionActions; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.RpcService; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -/** - * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters - * upon receiving jobs. - */ -public class MiniClusterJobDispatcher { - - private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class); - - // ------------------------------------------------------------------------ - - /** lock to ensure that this dispatcher executes only one job at a time */ - private final Object lock = new Object(); - - /** the configuration with which the mini cluster was started */ - private final Configuration configuration; - - /** the RPC service to use by the job managers */ - private final RpcService rpcService; - - /** services for discovery, leader election, and recovery */ - private final HighAvailabilityServices haServices; - - /** al the services that the JobManager needs, such as BLOB service, factories, etc */ - private final JobManagerServices jobManagerServices; - - /** Registry for all metrics in the mini cluster */ - private final MetricRegistry metricRegistry; - - /** The number of JobManagers to launch (more than one simulates a high-availability setup) */ - private final int numJobManagers; - - /** The runner for the job and master. non-null if a job is currently running */ - private volatile JobManagerRunner[] runners; - - /** flag marking the dispatcher as hut down */ - private volatile boolean shutdown; - - - /** - * Starts a mini cluster job dispatcher. - * - * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a - * non-highly-available setup. - * - * @param config The configuration of the mini cluster - * @param haServices Access to the discovery, leader election, and recovery services - * - * @throws Exception Thrown, if the services for the JobMaster could not be started. - */ - public MiniClusterJobDispatcher( - Configuration config, - RpcService rpcService, - HighAvailabilityServices haServices, - MetricRegistry metricRegistry) throws Exception { - this(config, rpcService, haServices, metricRegistry, 1); - } - - /** - * Starts a mini cluster job dispatcher. - * - * <p>The dispatcher may kick off more than one JobManager per job, thus simulating - * a highly-available setup. - * - * @param config The configuration of the mini cluster - * @param haServices Access to the discovery, leader election, and recovery services - * @param numJobManagers The number of JobMasters to start for each job. - * - * @throws Exception Thrown, if the services for the JobMaster could not be started. - */ - public MiniClusterJobDispatcher( - Configuration config, - RpcService rpcService, - HighAvailabilityServices haServices, - MetricRegistry metricRegistry, - int numJobManagers) throws Exception { - - checkArgument(numJobManagers >= 1); - this.configuration = checkNotNull(config); - this.rpcService = checkNotNull(rpcService); - this.haServices = checkNotNull(haServices); - this.metricRegistry = checkNotNull(metricRegistry); - this.numJobManagers = numJobManagers; - - LOG.info("Creating JobMaster services"); - this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices); - } - - // ------------------------------------------------------------------------ - // life cycle - // ------------------------------------------------------------------------ - - /** - * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be - * terminally failed. - */ - public void shutdown() { - synchronized (lock) { - if (!shutdown) { - shutdown = true; - - LOG.info("Shutting down the dispatcher"); - - // in this shutdown code we copy the references to the stack first, - // to avoid concurrent modification - - JobManagerRunner[] runners = this.runners; - if (runners != null) { - this.runners = null; - - for (JobManagerRunner runner : runners) { - runner.shutdown(); - } - } - } - } - } - - // ------------------------------------------------------------------------ - // submitting jobs - // ------------------------------------------------------------------------ - - /** - * This method executes a job in detached mode. The method returns immediately after the job - * has been added to the - * - * @param job The Flink job to execute - * - * @throws JobExecutionException Thrown if anything went amiss during initial job launch, - * or if the job terminally failed. - */ - public void runDetached(JobGraph job) throws JobExecutionException { - checkNotNull(job); - - LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID()); - - synchronized (lock) { - checkState(!shutdown, "mini cluster is shut down"); - checkState(runners == null, "mini cluster can only execute one job at a time"); - - DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers); - - this.runners = startJobRunners(job, finalizer, finalizer); - } - } - - /** - * This method runs a job in blocking mode. The method returns only after the job - * completed successfully, or after it failed terminally. - * - * @param job The Flink job to execute - * @return The result of the job execution - * - * @throws JobExecutionException Thrown if anything went amiss during initial job lauch, - * or if the job terminally failed. - */ - public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { - checkNotNull(job); - - LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID()); - final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers); - - synchronized (lock) { - checkState(!shutdown, "mini cluster is shut down"); - checkState(runners == null, "mini cluster can only execute one job at a time"); - - this.runners = startJobRunners(job, sync, sync); - } - - try { - return sync.getResult(); - } - finally { - // always clear the status for the next job - runners = null; - } - } - - private JobManagerRunner[] startJobRunners( - JobGraph job, - OnCompletionActions onCompletion, - FatalErrorHandler errorHandler) throws JobExecutionException { - LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID()); - - JobManagerRunner[] runners = new JobManagerRunner[numJobManagers]; - for (int i = 0; i < numJobManagers; i++) { - try { - runners[i] = new JobManagerRunner(job, configuration, - rpcService, haServices, jobManagerServices, metricRegistry, - onCompletion, errorHandler); - runners[i].start(); - } - catch (Throwable t) { - // shut down all the ones so far - for (int k = 0; k <= i; k++) { - try { - if (runners[i] != null) { - runners[i].shutdown(); - } - } catch (Throwable ignored) { - // silent shutdown - } - } - - throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t); - } - } - - return runners; - } - - // ------------------------------------------------------------------------ - // test methods to simulate job master failures - // ------------------------------------------------------------------------ - -// public void killJobMaster(int which) { -// checkArgument(which >= 0 && which < numJobManagers, "no such job master"); -// checkState(!shutdown, "mini cluster is shut down"); -// -// JobManagerRunner[] runners = this.runners; -// checkState(runners != null, "mini cluster it not executing a job right now"); -// -// runners[which].shutdown(new Throwable("kill JobManager")); -// } - - // ------------------------------------------------------------------------ - // utility classes - // ------------------------------------------------------------------------ - - /** - * Simple class that waits for all runners to have reported that they are done. - * In the case of a high-availability test setup, there may be multiple runners. - * After that, it marks the mini cluster as ready to receive new jobs. - */ - private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler { - - private final AtomicInteger numJobManagersToWaitFor; - - private DetachedFinalizer(int numJobManagersToWaitFor) { - this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor); - } - - @Override - public void jobFinished(JobExecutionResult result) { - decrementCheckAndCleanup(); - } - - @Override - public void jobFailed(Throwable cause) { - decrementCheckAndCleanup(); - } - - @Override - public void jobFinishedByOther() { - decrementCheckAndCleanup(); - } - - @Override - public void onFatalError(Throwable exception) { - decrementCheckAndCleanup(); - } - - private void decrementCheckAndCleanup() { - if (numJobManagersToWaitFor.decrementAndGet() == 0) { - MiniClusterJobDispatcher.this.runners = null; - } - } - } - - // ------------------------------------------------------------------------ - - /** - * This class is used to sync on blocking jobs across multiple runners. - * Only after all runners reported back that they are finished, the - * result will be released. - * - * That way it is guaranteed that after the blocking job submit call returns, - * the dispatcher is immediately free to accept another job. - */ - private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler { - - private final JobID jobId; - - private final CountDownLatch jobMastersToWaitFor; - - private volatile Throwable jobException; - - private volatile Throwable runnerException; - - private volatile JobExecutionResult result; - - BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { - this.jobId = jobId; - this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); - } - - @Override - public void jobFinished(JobExecutionResult jobResult) { - this.result = jobResult; - jobMastersToWaitFor.countDown(); - } - - @Override - public void jobFailed(Throwable cause) { - jobException = cause; - jobMastersToWaitFor.countDown(); - } - - @Override - public void jobFinishedByOther() { - this.jobMastersToWaitFor.countDown(); - } - - @Override - public void onFatalError(Throwable exception) { - if (runnerException == null) { - runnerException = exception; - } - } - - public JobExecutionResult getResult() throws JobExecutionException, InterruptedException { - jobMastersToWaitFor.await(); - - final Throwable jobFailureCause = this.jobException; - final Throwable runnerException = this.runnerException; - final JobExecutionResult result = this.result; - - // (1) we check if the job terminated with an exception - // (2) we check whether the job completed successfully - // (3) we check if we have exceptions from the JobManagers. the job may still have - // completed successfully in that case, if multiple JobMasters were running - // and other took over. only if all encounter a fatal error, the job cannot finish - - if (jobFailureCause != null) { - if (jobFailureCause instanceof JobExecutionException) { - throw (JobExecutionException) jobFailureCause; - } - else { - throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause); - } - } - else if (result != null) { - return result; - } - else if (runnerException != null) { - throw new JobExecutionException(jobId, - "The job execution failed because all JobManagers encountered fatal errors", runnerException); - } - else { - throw new IllegalStateException("Bug: Job finished with neither error nor result."); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java new file mode 100644 index 0000000..1ee38e0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.minicluster; + +import akka.actor.ActorSystem; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.ExceptionUtils; + +import scala.Option; +import scala.Tuple2; + +import javax.annotation.concurrent.GuardedBy; + +import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + + +public class MiniCluster { + + /** The lock to guard startup / shutdown / manipulation methods */ + private final Object lock = new Object(); + + /** The configuration for this mini cluster */ + private final MiniClusterConfiguration config; + + @GuardedBy("lock") + private MetricRegistry metricRegistry; + + @GuardedBy("lock") + private RpcService commonRpcService; + + @GuardedBy("lock") + private RpcService[] jobManagerRpcServices; + + @GuardedBy("lock") + private RpcService[] taskManagerRpcServices; + + @GuardedBy("lock") + private HighAvailabilityServices haServices; + + @GuardedBy("lock") + private MiniClusterJobDispatcher jobDispatcher; + + /** Flag marking the mini cluster as started/running */ + @GuardedBy("lock") + private boolean running; + + // ------------------------------------------------------------------------ + + /** + * Creates a new mini cluster with the default configuration: + * <ul> + * <li>One JobManager</li> + * <li>One TaskManager</li> + * <li>One task slot in the TaskManager</li> + * <li>All components share the same RPC subsystem (minimizes communication overhead)</li> + * </ul> + */ + public MiniCluster() { + this(new MiniClusterConfiguration()); + } + + /** + * + * @param config The configuration for the mini cluster + */ + public MiniCluster(MiniClusterConfiguration config) { + this.config = checkNotNull(config, "config may not be null"); + } + + /** + * Creates a mini cluster based on the given configuration. + * + * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. + * @see #MiniCluster(MiniClusterConfiguration) + */ + @Deprecated + public MiniCluster(Configuration config) { + this(createConfig(config, true)); + } + + /** + * Creates a mini cluster based on the given configuration, starting one or more + * RPC services, depending on the given flag. + * + * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. + * @see #MiniCluster(MiniClusterConfiguration) + */ + @Deprecated + public MiniCluster(Configuration config, boolean singleRpcService) { + this(createConfig(config, singleRpcService)); + } + + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + + /** + * Checks if the mini cluster was started and is running. + */ + public boolean isRunning() { + return running; + } + + /** + * Starts the mini cluster, based on the configured properties. + * + * @throws Exception This method passes on any exception that occurs during the startup of + * the mini cluster. + */ + public void start() throws Exception { + synchronized (lock) { + checkState(!running, "FlinkMiniCluster is already running"); + + final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration()); + final Time rpcTimeout = config.getRpcTimeout(); + final int numJobManagers = config.getNumJobManagers(); + final int numTaskManagers = config.getNumTaskManagers(); + final boolean singleRpc = config.getUseSingleRpcSystem(); + + try { + metricRegistry = createMetricRegistry(configuration); + + RpcService[] jobManagerRpcServices = new RpcService[numJobManagers]; + RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers]; + + // bring up all the RPC services + if (singleRpc) { + // one common RPC for all + commonRpcService = createRpcService(configuration, rpcTimeout, false, null); + + // set that same RPC service for all JobManagers and TaskManagers + for (int i = 0; i < numJobManagers; i++) { + jobManagerRpcServices[i] = commonRpcService; + } + for (int i = 0; i < numTaskManagers; i++) { + taskManagerRpcServices[i] = commonRpcService; + } + } + else { + // start a new service per component, possibly with custom bind addresses + final String jobManagerBindAddress = config.getJobManagerBindAddress(); + final String taskManagerBindAddress = config.getTaskManagerBindAddress(); + + for (int i = 0; i < numJobManagers; i++) { + jobManagerRpcServices[i] = createRpcService( + configuration, rpcTimeout, true, jobManagerBindAddress); + } + + for (int i = 0; i < numTaskManagers; i++) { + taskManagerRpcServices[i] = createRpcService( + configuration, rpcTimeout, true, taskManagerBindAddress); + } + + this.jobManagerRpcServices = jobManagerRpcServices; + this.taskManagerRpcServices = taskManagerRpcServices; + } + + // create the high-availability services + haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration); + + // bring up the dispatcher that launches JobManagers when jobs submitted + jobDispatcher = new MiniClusterJobDispatcher( + configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices); + } + catch (Exception e) { + // cleanup everything + try { + shutdownInternally(); + } catch (Exception ee) { + e.addSuppressed(ee); + } + throw e; + } + + // now officially mark this as running + running = true; + } + } + + /** + * Shuts down the mini cluster, failing all currently executing jobs. + * The mini cluster can be started again by calling the {@link #start()} method again. + * + * <p>This method shuts down all started services and components, + * even if an exception occurs in the process of shutting down some component. + * + * @throws Exception Thrown, if the shutdown did not complete cleanly. + */ + public void shutdown() throws Exception { + synchronized (lock) { + if (running) { + try { + shutdownInternally(); + } finally { + running = false; + } + } + } + } + + @GuardedBy("lock") + private void shutdownInternally() throws Exception { + // this should always be called under the lock + assert Thread.holdsLock(lock); + + // collect the first exception, but continue and add all successive + // exceptions as suppressed + Throwable exception = null; + + // cancel all jobs and shut down the job dispatcher + if (jobDispatcher != null) { + try { + jobDispatcher.shutdown(); + } catch (Exception e) { + exception = firstOrSuppressed(e, exception); + } + jobDispatcher = null; + } + + // shut down high-availability services + if (haServices != null) { + try { + haServices.shutdown(); + } catch (Exception e) { + exception = firstOrSuppressed(e, exception); + } + haServices = null; + } + + // shut down the RpcServices + if (commonRpcService != null) { + exception = shutDownRpc(commonRpcService, exception); + commonRpcService = null; + } + if (jobManagerRpcServices != null) { + for (RpcService service : jobManagerRpcServices) { + exception = shutDownRpc(service, exception); + } + jobManagerRpcServices = null; + } + if (taskManagerRpcServices != null) { + for (RpcService service : taskManagerRpcServices) { + exception = shutDownRpc(service, exception); + } + taskManagerRpcServices = null; + } + + // metrics shutdown + if (metricRegistry != null) { + metricRegistry.shutdown(); + metricRegistry = null; + } + + // if anything went wrong, throw the first error with all the additional suppressed exceptions + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Error while shutting down mini cluster"); + } + } + + // ------------------------------------------------------------------------ + // running jobs + // ------------------------------------------------------------------------ + + /** + * This method executes a job in detached mode. The method returns immediately after the job + * has been added to the + * + * @param job The Flink job to execute + * + * @throws JobExecutionException Thrown if anything went amiss during initial job launch, + * or if the job terminally failed. + */ + public void runDetached(JobGraph job) throws JobExecutionException { + checkNotNull(job, "job is null"); + + synchronized (lock) { + checkState(running, "mini cluster is not running"); + jobDispatcher.runDetached(job); + } + } + + /** + * This method runs a job in blocking mode. The method returns only after the job + * completed successfully, or after it failed terminally. + * + * @param job The Flink job to execute + * @return The result of the job execution + * + * @throws JobExecutionException Thrown if anything went amiss during initial job launch, + * or if the job terminally failed. + */ + public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { + checkNotNull(job, "job is null"); + + MiniClusterJobDispatcher dispatcher; + synchronized (lock) { + checkState(running, "mini cluster is not running"); + dispatcher = this.jobDispatcher; + } + + return dispatcher.runJobBlocking(job); + } + + // ------------------------------------------------------------------------ + // factories - can be overridden by subclasses to alter behavior + // ------------------------------------------------------------------------ + + /** + * Factory method to create the metric registry for the mini cluster + * + * @param config The configuration of the mini cluster + */ + protected MetricRegistry createMetricRegistry(Configuration config) { + return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + } + + /** + * Factory method to instantiate the RPC service. + * + * @param config + * The configuration of the mini cluster + * @param askTimeout + * The default RPC timeout for asynchronous "ask" requests. + * @param remoteEnabled + * True, if the RPC service should be reachable from other (remote) RPC services. + * @param bindAddress + * The address to bind the RPC service to. Only relevant when "remoteEnabled" is true. + * + * @return The instantiated RPC service + */ + protected RpcService createRpcService( + Configuration config, + Time askTimeout, + boolean remoteEnabled, + String bindAddress) { + + ActorSystem actorSystem; + if (remoteEnabled) { + Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0); + actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings)); + } else { + actorSystem = AkkaUtils.createLocalActorSystem(config); + } + + return new AkkaRpcService(actorSystem, askTimeout); + } + + // ------------------------------------------------------------------------ + // miscellaneous utilities + // ------------------------------------------------------------------------ + + private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) { + try { + if (rpcService != null) { + rpcService.stopService(); + } + return priorException; + } + catch (Throwable t) { + return firstOrSuppressed(t, priorException); + } + } + + private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleActorSystem) { + MiniClusterConfiguration config = cfg == null ? + new MiniClusterConfiguration() : + new MiniClusterConfiguration(cfg); + + if (!singleActorSystem) { + config.setUseRpcServicePerComponent(); + } + + return config; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java new file mode 100644 index 0000000..a8d7b10 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.minicluster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import scala.concurrent.duration.FiniteDuration; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class MiniClusterConfiguration { + + private final Configuration config; + + private boolean singleRpcService = true; + + private int numJobManagers = 1; + + private int numTaskManagers = 1; + + private String commonBindAddress; + + // ------------------------------------------------------------------------ + // Construction + // ------------------------------------------------------------------------ + + public MiniClusterConfiguration() { + this.config = new Configuration(); + } + + public MiniClusterConfiguration(Configuration config) { + checkNotNull(config); + this.config = new Configuration(config); + } + + // ------------------------------------------------------------------------ + // setters + // ------------------------------------------------------------------------ + + public void addConfiguration(Configuration config) { + checkNotNull(config, "configuration must not be null"); + this.config.addAll(config); + } + + public void setUseSingleRpcService() { + this.singleRpcService = true; + } + + public void setUseRpcServicePerComponent() { + this.singleRpcService = false; + } + + public void setNumJobManagers(int numJobManagers) { + checkArgument(numJobManagers >= 1, "must have at least one JobManager"); + this.numJobManagers = numJobManagers; + } + + public void setNumTaskManagers(int numTaskManagers) { + checkArgument(numTaskManagers >= 1, "must have at least one TaskManager"); + this.numTaskManagers = numTaskManagers; + } + + public void setNumTaskManagerSlots(int numTaskSlots) { + checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager"); + this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots); + } + + public void setCommonRpcBindAddress(String bindAddress) { + checkNotNull(bindAddress, "bind address must not be null"); + this.commonBindAddress = bindAddress; + } + + // ------------------------------------------------------------------------ + // getters + // ------------------------------------------------------------------------ + + public Configuration getConfiguration() { + return config; + } + + public boolean getUseSingleRpcSystem() { + return singleRpcService; + } + + public int getNumJobManagers() { + return numJobManagers; + } + + public int getNumTaskManagers() { + return numTaskManagers; + } + + public int getNumSlotsPerTaskManager() { + return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + } + + public String getJobManagerBindAddress() { + return commonBindAddress != null ? + commonBindAddress : + config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); + } + + public String getTaskManagerBindAddress() { + return commonBindAddress != null ? + commonBindAddress : + config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost"); + } + + public Time getRpcTimeout() { + FiniteDuration duration = AkkaUtils.getTimeout(config); + return Time.of(duration.length(), duration.unit()); + } + + // ------------------------------------------------------------------------ + // utils + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "MiniClusterConfiguration{" + + "singleRpcService=" + singleRpcService + + ", numJobManagers=" + numJobManagers + + ", numTaskManagers=" + numTaskManagers + + ", commonBindAddress='" + commonBindAddress + '\'' + + ", config=" + config + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java new file mode 100644 index 0000000..d99eff6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.minicluster; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters + * upon receiving jobs. + */ +public class MiniClusterJobDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class); + + // ------------------------------------------------------------------------ + + /** lock to ensure that this dispatcher executes only one job at a time */ + private final Object lock = new Object(); + + /** the configuration with which the mini cluster was started */ + private final Configuration configuration; + + /** the RPC services to use by the job managers */ + private final RpcService[] rpcServices; + + /** services for discovery, leader election, and recovery */ + private final HighAvailabilityServices haServices; + + /** al the services that the JobManager needs, such as BLOB service, factories, etc */ + private final JobManagerServices jobManagerServices; + + /** Registry for all metrics in the mini cluster */ + private final MetricRegistry metricRegistry; + + /** The number of JobManagers to launch (more than one simulates a high-availability setup) */ + private final int numJobManagers; + + /** The runner for the job and master. non-null if a job is currently running */ + private volatile JobManagerRunner[] runners; + + /** flag marking the dispatcher as hut down */ + private volatile boolean shutdown; + + + /** + * Starts a mini cluster job dispatcher. + * + * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a + * non-highly-available setup. + * + * @param config The configuration of the mini cluster + * @param haServices Access to the discovery, leader election, and recovery services + * + * @throws Exception Thrown, if the services for the JobMaster could not be started. + */ + public MiniClusterJobDispatcher( + Configuration config, + RpcService rpcService, + HighAvailabilityServices haServices, + MetricRegistry metricRegistry) throws Exception { + this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService }); + } + + /** + * Starts a mini cluster job dispatcher. + * + * <p>The dispatcher may kick off more than one JobManager per job, thus simulating + * a highly-available setup. + * + * @param config The configuration of the mini cluster + * @param haServices Access to the discovery, leader election, and recovery services + * @param numJobManagers The number of JobMasters to start for each job. + * + * @throws Exception Thrown, if the services for the JobMaster could not be started. + */ + public MiniClusterJobDispatcher( + Configuration config, + HighAvailabilityServices haServices, + MetricRegistry metricRegistry, + int numJobManagers, + RpcService[] rpcServices) throws Exception { + + checkArgument(numJobManagers >= 1); + checkArgument(rpcServices.length == numJobManagers); + + this.configuration = checkNotNull(config); + this.rpcServices = rpcServices; + this.haServices = checkNotNull(haServices); + this.metricRegistry = checkNotNull(metricRegistry); + this.numJobManagers = numJobManagers; + + LOG.info("Creating JobMaster services"); + this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices); + } + + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + + /** + * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be + * terminally failed. + */ + public void shutdown() { + synchronized (lock) { + if (!shutdown) { + shutdown = true; + + LOG.info("Shutting down the dispatcher"); + + // in this shutdown code we copy the references to the stack first, + // to avoid concurrent modification + + JobManagerRunner[] runners = this.runners; + if (runners != null) { + this.runners = null; + + for (JobManagerRunner runner : runners) { + runner.shutdown(); + } + } + } + } + } + + // ------------------------------------------------------------------------ + // submitting jobs + // ------------------------------------------------------------------------ + + /** + * This method executes a job in detached mode. The method returns immediately after the job + * has been added to the + * + * @param job The Flink job to execute + * + * @throws JobExecutionException Thrown if anything went amiss during initial job launch, + * or if the job terminally failed. + */ + public void runDetached(JobGraph job) throws JobExecutionException { + checkNotNull(job); + + LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID()); + + synchronized (lock) { + checkState(!shutdown, "mini cluster is shut down"); + checkState(runners == null, "mini cluster can only execute one job at a time"); + + DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers); + + this.runners = startJobRunners(job, finalizer, finalizer); + } + } + + /** + * This method runs a job in blocking mode. The method returns only after the job + * completed successfully, or after it failed terminally. + * + * @param job The Flink job to execute + * @return The result of the job execution + * + * @throws JobExecutionException Thrown if anything went amiss during initial job launch, + * or if the job terminally failed. + */ + public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { + checkNotNull(job); + + LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID()); + final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers); + + synchronized (lock) { + checkState(!shutdown, "mini cluster is shut down"); + checkState(runners == null, "mini cluster can only execute one job at a time"); + + this.runners = startJobRunners(job, sync, sync); + } + + try { + return sync.getResult(); + } + finally { + // always clear the status for the next job + runners = null; + } + } + + private JobManagerRunner[] startJobRunners( + JobGraph job, + OnCompletionActions onCompletion, + FatalErrorHandler errorHandler) throws JobExecutionException { + + LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID()); + + // we first need to mark the job as running in the HA services, so that the + // JobManager leader will recognize that it as work to do + try { + haServices.getRunningJobsRegistry().setJobRunning(job.getJobID()); + } + catch (Throwable t) { + throw new JobExecutionException(job.getJobID(), + "Could not register the job at the high-availability services", t); + } + + // start all JobManagers + JobManagerRunner[] runners = new JobManagerRunner[numJobManagers]; + for (int i = 0; i < numJobManagers; i++) { + try { + runners[i] = new JobManagerRunner(job, configuration, + rpcServices[i], haServices, jobManagerServices, metricRegistry, + onCompletion, errorHandler); + runners[i].start(); + } + catch (Throwable t) { + // shut down all the ones so far + for (int k = 0; k <= i; k++) { + try { + if (runners[i] != null) { + runners[i].shutdown(); + } + } catch (Throwable ignored) { + // silent shutdown + } + } + + // un-register the job from the high.availability services + try { + haServices.getRunningJobsRegistry().setJobFinished(job.getJobID()); + } + catch (Throwable tt) { + LOG.warn("Could not properly unregister job from high-availability services", tt); + } + + throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t); + } + } + + return runners; + } + + // ------------------------------------------------------------------------ + // test methods to simulate job master failures + // ------------------------------------------------------------------------ + +// public void killJobMaster(int which) { +// checkArgument(which >= 0 && which < numJobManagers, "no such job master"); +// checkState(!shutdown, "mini cluster is shut down"); +// +// JobManagerRunner[] runners = this.runners; +// checkState(runners != null, "mini cluster it not executing a job right now"); +// +// runners[which].shutdown(new Throwable("kill JobManager")); +// } + + // ------------------------------------------------------------------------ + // utility classes + // ------------------------------------------------------------------------ + + /** + * Simple class that waits for all runners to have reported that they are done. + * In the case of a high-availability test setup, there may be multiple runners. + * After that, it marks the mini cluster as ready to receive new jobs. + */ + private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler { + + private final AtomicInteger numJobManagersToWaitFor; + + private DetachedFinalizer(int numJobManagersToWaitFor) { + this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor); + } + + @Override + public void jobFinished(JobExecutionResult result) { + decrementCheckAndCleanup(); + } + + @Override + public void jobFailed(Throwable cause) { + decrementCheckAndCleanup(); + } + + @Override + public void jobFinishedByOther() { + decrementCheckAndCleanup(); + } + + @Override + public void onFatalError(Throwable exception) { + decrementCheckAndCleanup(); + } + + private void decrementCheckAndCleanup() { + if (numJobManagersToWaitFor.decrementAndGet() == 0) { + MiniClusterJobDispatcher.this.runners = null; + } + } + } + + // ------------------------------------------------------------------------ + + /** + * This class is used to sync on blocking jobs across multiple runners. + * Only after all runners reported back that they are finished, the + * result will be released. + * + * That way it is guaranteed that after the blocking job submit call returns, + * the dispatcher is immediately free to accept another job. + */ + private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler { + + private final JobID jobId; + + private final CountDownLatch jobMastersToWaitFor; + + private volatile Throwable jobException; + + private volatile Throwable runnerException; + + private volatile JobExecutionResult result; + + BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { + this.jobId = jobId; + this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); + } + + @Override + public void jobFinished(JobExecutionResult jobResult) { + this.result = jobResult; + jobMastersToWaitFor.countDown(); + } + + @Override + public void jobFailed(Throwable cause) { + jobException = cause; + jobMastersToWaitFor.countDown(); + } + + @Override + public void jobFinishedByOther() { + this.jobMastersToWaitFor.countDown(); + } + + @Override + public void onFatalError(Throwable exception) { + if (runnerException == null) { + runnerException = exception; + } + } + + public JobExecutionResult getResult() throws JobExecutionException, InterruptedException { + jobMastersToWaitFor.await(); + + final Throwable jobFailureCause = this.jobException; + final Throwable runnerException = this.runnerException; + final JobExecutionResult result = this.result; + + // (1) we check if the job terminated with an exception + // (2) we check whether the job completed successfully + // (3) we check if we have exceptions from the JobManagers. the job may still have + // completed successfully in that case, if multiple JobMasters were running + // and other took over. only if all encounter a fatal error, the job cannot finish + + if (jobFailureCause != null) { + if (jobFailureCause instanceof JobExecutionException) { + throw (JobExecutionException) jobFailureCause; + } + else { + throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause); + } + } + else if (result != null) { + return result; + } + else if (runnerException != null) { + throw new JobExecutionException(jobId, + "The job execution failed because all JobManagers encountered fatal errors", runnerException); + } + else { + throw new IllegalStateException("Bug: Job finished with neither error nor result."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 6f6d525..3122804 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> } else { try { LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress); + highAvailabilityServices.getJobManagerLeaderRetriever(jobID); jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); } catch (Exception e) { log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 9e71349..e7f52e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -191,8 +191,7 @@ public class JobLeaderService { LOG.info("Add job {} for job leader monitoring.", jobId); final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - jobId, - defaultTargetAddress); + jobId); JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId); http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 3e88e8c..877812b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override - public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception { + public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception { LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); if (service != null) { return service; http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java new file mode 100644 index 0000000..dd43337 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.minicluster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +/** + * Integration test cases for the {@link MiniCluster}. + */ +public class MiniClusterITCase extends TestLogger { + +// @Test + public void runJobWithSingleRpcService() throws Exception { + MiniClusterConfiguration cfg = new MiniClusterConfiguration(); + + // should be the default, but set anyways to make sure the test + // stays valid when the default changes + cfg.setUseSingleRpcService(); + + MiniCluster miniCluster = new MiniCluster(cfg); + executeJob(miniCluster); + } + +// @Test + public void runJobWithMultipleRpcServices() throws Exception { + MiniClusterConfiguration cfg = new MiniClusterConfiguration(); + cfg.setUseRpcServicePerComponent(); + + MiniCluster miniCluster = new MiniCluster(cfg); + executeJob(miniCluster); + } + +// @Test + public void runJobWithMultipleJobManagers() throws Exception { + MiniClusterConfiguration cfg = new MiniClusterConfiguration(); + cfg.setNumJobManagers(3); + + MiniCluster miniCluster = new MiniCluster(cfg); + executeJob(miniCluster); + } + + private static void executeJob(MiniCluster miniCluster) throws Exception { + miniCluster.start(); + + JobGraph job = getSimpleJob(); + miniCluster.runJobBlocking(job); + } + + private static JobGraph getSimpleJob() { + JobVertex task = new JobVertex("Test task"); + task.setParallelism(1); + task.setMaxParallelism(1); + task.setInvokableClass(NoOpInvokable.class); + + return new JobGraph(new JobID(), "Test Job", task); + } +}
