[FLINK-4836] [cluster management] Add flink mini cluster (part 1)

This implements
  - mini cluster configuration
  - startup / shutdown of common services (rpc, ha)
  - startup / shutdown of JobManager and Dispatcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/106cb9e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/106cb9e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/106cb9e3

Branch: refs/heads/master
Commit: 106cb9e3ddb8ad019db99bc746e77c2ef48cb5e4
Parents: 9615f15
Author: Stephan Ewen <[email protected]>
Authored: Sat Oct 15 00:25:41 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  12 +-
 .../HighAvailabilityServicesUtils.java          |  17 +
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   1 -
 .../jobmaster/MiniClusterJobDispatcher.java     | 394 -----------------
 .../flink/runtime/minicluster/MiniCluster.java  | 406 ++++++++++++++++++
 .../minicluster/MiniClusterConfiguration.java   | 147 +++++++
 .../minicluster/MiniClusterJobDispatcher.java   | 418 +++++++++++++++++++
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../runtime/taskexecutor/JobLeaderService.java  |   3 +-
 .../TestingHighAvailabilityServices.java        |   2 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  79 ++++
 12 files changed, 1075 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d1357a8..1069f2d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -110,8 +110,6 @@ public final class ExceptionUtils {
                }
        }
 
-
-
        /**
         * Throws the given {@code Throwable} in scenarios where the signatures 
do not allow you to
         * throw an arbitrary Throwable. Errors and RuntimeExceptions are 
thrown directly, other exceptions
@@ -214,10 +212,8 @@ public final class ExceptionUtils {
                }
        }
 
-       /**
-        * Private constructor to prevent instantiation.
-        */
-       private ExceptionUtils() {
-               throw new RuntimeException();
-       }
+       // 
------------------------------------------------------------------------
+
+       /** Private constructor to prevent instantiation. */
+       private ExceptionUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index f3da847..9113309 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -24,6 +24,23 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 public class HighAvailabilityServicesUtils {
 
+       public static HighAvailabilityServices 
createAvailableOrEmbeddedServices(Configuration config) throws Exception {
+               HighAvailabilityMode highAvailabilityMode = 
LeaderRetrievalUtils.getRecoveryMode(config);
+
+               switch (highAvailabilityMode) {
+                       case NONE:
+                               return new EmbeddedNonHaServices();
+
+                       case ZOOKEEPER:
+                               throw new 
UnsupportedOperationException("ZooKeeper high availability services " +
+                                               "have not been implemented 
yet.");
+
+                       default:
+                               throw new Exception("High availability mode " + 
highAvailabilityMode + " is not supported.");
+               }
+       }
+       
+       
        public static HighAvailabilityServices 
createHighAvailabilityServices(Configuration configuration) throws Exception {
                HighAvailabilityMode highAvailabilityMode = 
LeaderRetrievalUtils.getRecoveryMode(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index be19c60..e38840b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -113,7 +113,7 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultAddress) throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
                return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, getPathForJobManager(jobID));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 74ca6f3..3313d8a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -289,7 +289,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
        @Override
        public void jobFinishedByOther() {
                try {
-                       unregisterJobFromHighAvailability();
                        shutdownInternally();
                }
                finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
deleted file mode 100644
index 019ccfe..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * The dispatcher that runs in the mini cluster, waits for jobs, and starts 
job masters
- * upon receiving jobs.
- */
-public class MiniClusterJobDispatcher {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
-
-       // 
------------------------------------------------------------------------
-
-       /** lock to ensure that this dispatcher executes only one job at a time 
*/
-       private final Object lock = new Object();
-
-       /** the configuration with which the mini cluster was started */
-       private final Configuration configuration;
-
-       /** the RPC service to use by the job managers */
-       private final RpcService rpcService;
-
-       /** services for discovery, leader election, and recovery */
-       private final HighAvailabilityServices haServices;
-
-       /** al the services that the JobManager needs, such as BLOB service, 
factories, etc */
-       private final JobManagerServices jobManagerServices;
-
-       /** Registry for all metrics in the mini cluster */
-       private final MetricRegistry metricRegistry;
-
-       /** The number of JobManagers to launch (more than one simulates a 
high-availability setup) */
-       private final int numJobManagers;
-
-       /** The runner for the job and master. non-null if a job is currently 
running */
-       private volatile JobManagerRunner[] runners;
-
-       /** flag marking the dispatcher as hut down */
-       private volatile boolean shutdown;
-
-
-       /**
-        * Starts a mini cluster job dispatcher.
-        * 
-        * <p>The dispatcher kicks off one JobManager per job, a behavior 
similar to a
-        * non-highly-available setup.
-        * 
-        * @param config The configuration of the mini cluster
-        * @param haServices Access to the discovery, leader election, and 
recovery services
-        * 
-        * @throws Exception Thrown, if the services for the JobMaster could 
not be started.
-        */
-       public MiniClusterJobDispatcher(
-                       Configuration config,
-                       RpcService rpcService,
-                       HighAvailabilityServices haServices,
-                       MetricRegistry metricRegistry) throws Exception {
-               this(config, rpcService, haServices, metricRegistry, 1);
-       }
-
-       /**
-        * Starts a mini cluster job dispatcher.
-        *
-        * <p>The dispatcher may kick off more than one JobManager per job, 
thus simulating
-        * a highly-available setup.
-        * 
-        * @param config The configuration of the mini cluster
-        * @param haServices Access to the discovery, leader election, and 
recovery services
-        * @param numJobManagers The number of JobMasters to start for each job.
-        * 
-        * @throws Exception Thrown, if the services for the JobMaster could 
not be started.
-        */
-       public MiniClusterJobDispatcher(
-                       Configuration config,
-                       RpcService rpcService,
-                       HighAvailabilityServices haServices,
-                       MetricRegistry metricRegistry,
-                       int numJobManagers) throws Exception {
-
-               checkArgument(numJobManagers >= 1);
-               this.configuration = checkNotNull(config);
-               this.rpcService = checkNotNull(rpcService);
-               this.haServices = checkNotNull(haServices);
-               this.metricRegistry = checkNotNull(metricRegistry);
-               this.numJobManagers = numJobManagers;
-
-               LOG.info("Creating JobMaster services");
-               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config, haServices);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  life cycle
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Shuts down the mini cluster dispatcher. If a job is currently 
running, that job will be
-        * terminally failed.
-        */
-       public void shutdown() {
-               synchronized (lock) {
-                       if (!shutdown) {
-                               shutdown = true;
-
-                               LOG.info("Shutting down the dispatcher");
-
-                               // in this shutdown code we copy the references 
to the stack first,
-                               // to avoid concurrent modification
-
-                               JobManagerRunner[] runners = this.runners;
-                               if (runners != null) {
-                                       this.runners = null;
-
-                                       for (JobManagerRunner runner : runners) 
{
-                                               runner.shutdown();
-                                       }
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  submitting jobs
-       // 
------------------------------------------------------------------------
-
-       /**
-        * This method executes a job in detached mode. The method returns 
immediately after the job
-        * has been added to the
-        *
-        * @param job  The Flink job to execute
-        *
-        * @throws JobExecutionException Thrown if anything went amiss during 
initial job launch,
-        *         or if the job terminally failed.
-        */
-       public void runDetached(JobGraph job) throws JobExecutionException {
-               checkNotNull(job);
-
-               LOG.info("Received job for detached execution {} ({})", 
job.getName(), job.getJobID());
-
-               synchronized (lock) {
-                       checkState(!shutdown, "mini cluster is shut down");
-                       checkState(runners == null, "mini cluster can only 
execute one job at a time");
-
-                       DetachedFinalizer finalizer = new 
DetachedFinalizer(numJobManagers);
-
-                       this.runners = startJobRunners(job, finalizer, 
finalizer);
-               }
-       }
-
-       /**
-        * This method runs a job in blocking mode. The method returns only 
after the job
-        * completed successfully, or after it failed terminally.
-        *
-        * @param job  The Flink job to execute 
-        * @return The result of the job execution
-        *
-        * @throws JobExecutionException Thrown if anything went amiss during 
initial job lauch,
-        *         or if the job terminally failed.
-        */
-       public JobExecutionResult runJobBlocking(JobGraph job) throws 
JobExecutionException, InterruptedException {
-               checkNotNull(job);
-               
-               LOG.info("Received job for blocking execution {} ({})", 
job.getName(), job.getJobID());
-               final BlockingJobSync sync = new 
BlockingJobSync(job.getJobID(), numJobManagers);
-
-               synchronized (lock) {
-                       checkState(!shutdown, "mini cluster is shut down");
-                       checkState(runners == null, "mini cluster can only 
execute one job at a time");
-
-                       this.runners = startJobRunners(job, sync, sync);
-               }
-
-               try {
-                       return sync.getResult();
-               }
-               finally {
-                       // always clear the status for the next job
-                       runners = null;
-               }
-       }
-
-       private JobManagerRunner[] startJobRunners(
-                       JobGraph job,
-                       OnCompletionActions onCompletion,
-                       FatalErrorHandler errorHandler) throws 
JobExecutionException {
-               LOG.info("Starting {} JobMaster(s) for job {} ({})", 
numJobManagers, job.getName(), job.getJobID());
-
-               JobManagerRunner[] runners = new 
JobManagerRunner[numJobManagers];
-               for (int i = 0; i < numJobManagers; i++) {
-                       try {
-                               runners[i] = new JobManagerRunner(job, 
configuration,
-                                               rpcService, haServices, 
jobManagerServices, metricRegistry, 
-                                               onCompletion, errorHandler);
-                               runners[i].start();
-                       }
-                       catch (Throwable t) {
-                               // shut down all the ones so far
-                               for (int k = 0; k <= i; k++) {
-                                       try {
-                                               if (runners[i] != null) {
-                                                       runners[i].shutdown();
-                                               }
-                                       } catch (Throwable ignored) {
-                                               // silent shutdown
-                                       }
-                               }
-
-                               throw new JobExecutionException(job.getJobID(), 
"Could not start the JobManager(s) for the job", t);
-                       }
-               }
-
-               return runners;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test methods to simulate job master failures
-       // 
------------------------------------------------------------------------
-
-//     public void killJobMaster(int which) {
-//             checkArgument(which >= 0 && which < numJobManagers, "no such 
job master");
-//             checkState(!shutdown, "mini cluster is shut down");
-//
-//             JobManagerRunner[] runners = this.runners;
-//             checkState(runners != null, "mini cluster it not executing a 
job right now");
-//
-//             runners[which].shutdown(new Throwable("kill JobManager"));
-//     }
-
-       // 
------------------------------------------------------------------------
-       //  utility classes
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Simple class that waits for all runners to have reported that they 
are done.
-        * In the case of a high-availability test setup, there may be multiple 
runners.
-        * After that, it marks the mini cluster as ready to receive new jobs.
-        */
-       private class DetachedFinalizer implements OnCompletionActions, 
FatalErrorHandler {
-
-               private final AtomicInteger numJobManagersToWaitFor;
-
-               private DetachedFinalizer(int numJobManagersToWaitFor) {
-                       this.numJobManagersToWaitFor = new 
AtomicInteger(numJobManagersToWaitFor);
-               }
-
-               @Override
-               public void jobFinished(JobExecutionResult result) {
-                       decrementCheckAndCleanup();
-               }
-
-               @Override
-               public void jobFailed(Throwable cause) {
-                       decrementCheckAndCleanup();
-               }
-
-               @Override
-               public void jobFinishedByOther() {
-                       decrementCheckAndCleanup();
-               }
-
-               @Override
-               public void onFatalError(Throwable exception) {
-                       decrementCheckAndCleanup();
-               }
-
-               private void decrementCheckAndCleanup() {
-                       if (numJobManagersToWaitFor.decrementAndGet() == 0) {
-                               MiniClusterJobDispatcher.this.runners = null;
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * This class is used to sync on blocking jobs across multiple runners.
-        * Only after all runners reported back that they are finished, the
-        * result will be released.
-        * 
-        * That way it is guaranteed that after the blocking job submit call 
returns,
-        * the dispatcher is immediately free to accept another job.
-        */
-       private static class BlockingJobSync implements OnCompletionActions, 
FatalErrorHandler {
-
-               private final JobID jobId;
-
-               private final CountDownLatch jobMastersToWaitFor;
-
-               private volatile Throwable jobException;
-
-               private volatile Throwable runnerException;
-
-               private volatile JobExecutionResult result;
-               
-               BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
-                       this.jobId = jobId;
-                       this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
-               }
-
-               @Override
-               public void jobFinished(JobExecutionResult jobResult) {
-                       this.result = jobResult;
-                       jobMastersToWaitFor.countDown();
-               }
-
-               @Override
-               public void jobFailed(Throwable cause) {
-                       jobException = cause;
-                       jobMastersToWaitFor.countDown();
-               }
-
-               @Override
-               public void jobFinishedByOther() {
-                       this.jobMastersToWaitFor.countDown();
-               }
-
-               @Override
-               public void onFatalError(Throwable exception) {
-                       if (runnerException == null) {
-                               runnerException = exception;
-                       }
-               }
-
-               public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedException {
-                       jobMastersToWaitFor.await();
-
-                       final Throwable jobFailureCause = this.jobException;
-                       final Throwable runnerException = this.runnerException;
-                       final JobExecutionResult result = this.result;
-
-                       // (1) we check if the job terminated with an exception
-                       // (2) we check whether the job completed successfully
-                       // (3) we check if we have exceptions from the 
JobManagers. the job may still have
-                       //     completed successfully in that case, if multiple 
JobMasters were running
-                       //     and other took over. only if all encounter a 
fatal error, the job cannot finish
-
-                       if (jobFailureCause != null) {
-                               if (jobFailureCause instanceof 
JobExecutionException) {
-                                       throw (JobExecutionException) 
jobFailureCause;
-                               }
-                               else {
-                                       throw new JobExecutionException(jobId, 
"The job execution failed", jobFailureCause);
-                               }
-                       }
-                       else if (result != null) {
-                               return result;
-                       }
-                       else if (runnerException != null) {
-                               throw new JobExecutionException(jobId,
-                                               "The job execution failed 
because all JobManagers encountered fatal errors", runnerException);
-                       }
-                       else {
-                               throw new IllegalStateException("Bug: Job 
finished with neither error nor result.");
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
new file mode 100644
index 0000000..1ee38e0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import akka.actor.ActorSystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.ExceptionUtils;
+
+import scala.Option;
+import scala.Tuple2;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+public class MiniCluster {
+
+       /** The lock to guard startup / shutdown / manipulation methods */
+       private final Object lock = new Object();
+
+       /** The configuration for this mini cluster */
+       private final MiniClusterConfiguration config;
+
+       @GuardedBy("lock")
+       private MetricRegistry metricRegistry;
+
+       @GuardedBy("lock")
+       private RpcService commonRpcService;
+
+       @GuardedBy("lock")
+       private RpcService[] jobManagerRpcServices;
+
+       @GuardedBy("lock")
+       private RpcService[] taskManagerRpcServices;
+
+       @GuardedBy("lock")
+       private HighAvailabilityServices haServices;
+
+       @GuardedBy("lock")
+       private MiniClusterJobDispatcher jobDispatcher;
+
+       /** Flag marking the mini cluster as started/running */
+       @GuardedBy("lock")
+       private boolean running;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new mini cluster with the default configuration:
+        * <ul>
+        *     <li>One JobManager</li>
+        *     <li>One TaskManager</li>
+        *     <li>One task slot in the TaskManager</li>
+        *     <li>All components share the same RPC subsystem (minimizes 
communication overhead)</li>
+        * </ul>
+        */
+       public MiniCluster() {
+               this(new MiniClusterConfiguration());
+       }
+
+       /**
+        * 
+        * @param config The configuration for the mini cluster
+        */
+       public MiniCluster(MiniClusterConfiguration config) {
+               this.config = checkNotNull(config, "config may not be null");
+       }
+
+       /**
+        * Creates a mini cluster based on the given configuration.
+        * 
+        * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} 
instead. 
+        * @see #MiniCluster(MiniClusterConfiguration)
+        */
+       @Deprecated
+       public MiniCluster(Configuration config) {
+               this(createConfig(config, true));
+       }
+
+       /**
+        * Creates a mini cluster based on the given configuration, starting 
one or more
+        * RPC services, depending on the given flag.
+        *
+        * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} 
instead. 
+        * @see #MiniCluster(MiniClusterConfiguration)
+        */
+       @Deprecated
+       public MiniCluster(Configuration config, boolean singleRpcService) {
+               this(createConfig(config, singleRpcService));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Checks if the mini cluster was started and is running.
+        */
+       public boolean isRunning() {
+               return running;
+       }
+
+       /**
+        * Starts the mini cluster, based on the configured properties.
+        * 
+        * @throws Exception This method passes on any exception that occurs 
during the startup of
+        *                   the mini cluster.
+        */
+       public void start() throws Exception {
+               synchronized (lock) {
+                       checkState(!running, "FlinkMiniCluster is already 
running");
+
+                       final Configuration configuration = new 
UnmodifiableConfiguration(config.getConfiguration());
+                       final Time rpcTimeout = config.getRpcTimeout();
+                       final int numJobManagers = config.getNumJobManagers();
+                       final int numTaskManagers = config.getNumTaskManagers();
+                       final boolean singleRpc = 
config.getUseSingleRpcSystem();
+
+                       try {
+                               metricRegistry = 
createMetricRegistry(configuration);
+
+                               RpcService[] jobManagerRpcServices = new 
RpcService[numJobManagers];
+                               RpcService[] taskManagerRpcServices = new 
RpcService[numTaskManagers];
+
+                               // bring up all the RPC services
+                               if (singleRpc) {
+                                       // one common RPC for all
+                                       commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
+
+                                       // set that same RPC service for all 
JobManagers and TaskManagers
+                                       for (int i = 0; i < numJobManagers; 
i++) {
+                                               jobManagerRpcServices[i] = 
commonRpcService;
+                                       }
+                                       for (int i = 0; i < numTaskManagers; 
i++) {
+                                               taskManagerRpcServices[i] = 
commonRpcService;
+                                       }
+                               }
+                               else {
+                                       // start a new service per component, 
possibly with custom bind addresses
+                                       final String jobManagerBindAddress = 
config.getJobManagerBindAddress();
+                                       final String taskManagerBindAddress = 
config.getTaskManagerBindAddress();
+
+                                       for (int i = 0; i < numJobManagers; 
i++) {
+                                               jobManagerRpcServices[i] = 
createRpcService(
+                                                               configuration, 
rpcTimeout, true, jobManagerBindAddress);
+                                       }
+
+                                       for (int i = 0; i < numTaskManagers; 
i++) {
+                                               taskManagerRpcServices[i] = 
createRpcService(
+                                                               configuration, 
rpcTimeout, true, taskManagerBindAddress);
+                                       }
+
+                                       this.jobManagerRpcServices = 
jobManagerRpcServices;
+                                       this.taskManagerRpcServices = 
taskManagerRpcServices;
+                               }
+
+                               // create the high-availability services
+                               haServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+
+                               // bring up the dispatcher that launches 
JobManagers when jobs submitted
+                               jobDispatcher = new MiniClusterJobDispatcher(
+                                               configuration, haServices, 
metricRegistry, numJobManagers, jobManagerRpcServices);
+                       }
+                       catch (Exception e) {
+                               // cleanup everything
+                               try {
+                                       shutdownInternally();
+                               } catch (Exception ee) {
+                                       e.addSuppressed(ee);
+                               }
+                               throw e;
+                       }
+
+                       // now officially mark this as running
+                       running = true;
+               }
+       }
+
+       /**
+        * Shuts down the mini cluster, failing all currently executing jobs.
+        * The mini cluster can be started again by calling the {@link 
#start()} method again.
+        * 
+        * <p>This method shuts down all started services and components,
+        * even if an exception occurs in the process of shutting down some 
component. 
+        * 
+        * @throws Exception Thrown, if the shutdown did not complete cleanly.
+        */
+       public void shutdown() throws Exception {
+               synchronized (lock) {
+                       if (running) {
+                               try {
+                                       shutdownInternally();
+                               } finally {
+                                       running = false;
+                               }
+                       }
+               }
+       }
+
+       @GuardedBy("lock")
+       private void shutdownInternally() throws Exception {
+               // this should always be called under the lock
+               assert Thread.holdsLock(lock);
+
+               // collect the first exception, but continue and add all 
successive
+               // exceptions as suppressed
+               Throwable exception = null;
+
+               // cancel all jobs and shut down the job dispatcher
+               if (jobDispatcher != null) {
+                       try {
+                               jobDispatcher.shutdown();
+                       } catch (Exception e) {
+                               exception = firstOrSuppressed(e, exception);
+                       }
+                       jobDispatcher = null;
+               }
+
+               // shut down high-availability services
+               if (haServices != null) {
+                       try {
+                               haServices.shutdown();
+                       } catch (Exception e) {
+                               exception = firstOrSuppressed(e, exception);
+                       }
+                       haServices = null;
+               }
+
+               // shut down the RpcServices
+               if (commonRpcService != null) {
+                       exception = shutDownRpc(commonRpcService, exception);
+                       commonRpcService = null;
+               }
+               if (jobManagerRpcServices != null) {
+                       for (RpcService service : jobManagerRpcServices) {
+                               exception = shutDownRpc(service, exception);
+                       }
+                       jobManagerRpcServices = null;
+               }
+               if (taskManagerRpcServices != null) {
+                       for (RpcService service : taskManagerRpcServices) {
+                               exception = shutDownRpc(service, exception);
+                       }
+                       taskManagerRpcServices = null;
+               }
+
+               // metrics shutdown
+               if (metricRegistry != null) {
+                       metricRegistry.shutdown();
+                       metricRegistry = null;
+               }
+
+               // if anything went wrong, throw the first error with all the 
additional suppressed exceptions
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(exception, "Error while 
shutting down mini cluster");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  running jobs
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This method executes a job in detached mode. The method returns 
immediately after the job
+        * has been added to the
+        *
+        * @param job  The Flink job to execute
+        *
+        * @throws JobExecutionException Thrown if anything went amiss during 
initial job launch,
+        *         or if the job terminally failed.
+        */
+       public void runDetached(JobGraph job) throws JobExecutionException {
+               checkNotNull(job, "job is null");
+
+               synchronized (lock) {
+                       checkState(running, "mini cluster is not running");
+                       jobDispatcher.runDetached(job);
+               }
+       }
+
+       /**
+        * This method runs a job in blocking mode. The method returns only 
after the job
+        * completed successfully, or after it failed terminally.
+        *
+        * @param job  The Flink job to execute 
+        * @return The result of the job execution
+        *
+        * @throws JobExecutionException Thrown if anything went amiss during 
initial job launch,
+        *         or if the job terminally failed.
+        */
+       public JobExecutionResult runJobBlocking(JobGraph job) throws 
JobExecutionException, InterruptedException {
+               checkNotNull(job, "job is null");
+
+               MiniClusterJobDispatcher dispatcher;
+               synchronized (lock) {
+                       checkState(running, "mini cluster is not running");
+                       dispatcher = this.jobDispatcher;
+               }
+
+               return dispatcher.runJobBlocking(job);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  factories - can be overridden by subclasses to alter behavior
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Factory method to create the metric registry for the mini cluster
+        * 
+        * @param config The configuration of the mini cluster
+        */
+       protected MetricRegistry createMetricRegistry(Configuration config) {
+               return new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+       }
+
+       /**
+        * Factory method to instantiate the RPC service.
+        * 
+        * @param config
+        *            The configuration of the mini cluster
+        * @param askTimeout
+        *            The default RPC timeout for asynchronous "ask" requests.
+        * @param remoteEnabled
+        *            True, if the RPC service should be reachable from other 
(remote) RPC services.
+        * @param bindAddress
+        *            The address to bind the RPC service to. Only relevant 
when "remoteEnabled" is true.
+        * 
+        * @return The instantiated RPC service
+        */
+       protected RpcService createRpcService(
+                       Configuration config,
+                       Time askTimeout,
+                       boolean remoteEnabled,
+                       String bindAddress) {
+
+               ActorSystem actorSystem;
+               if (remoteEnabled) {
+                       Tuple2<String, Object> remoteSettings = new 
Tuple2<String, Object>(bindAddress, 0);
+                       actorSystem = AkkaUtils.createActorSystem(config, 
Option.apply(remoteSettings));
+               } else {
+                       actorSystem = AkkaUtils.createLocalActorSystem(config);
+               }
+
+               return new AkkaRpcService(actorSystem, askTimeout);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  miscellaneous utilities
+       // 
------------------------------------------------------------------------
+
+       private static Throwable shutDownRpc(RpcService rpcService, Throwable 
priorException) {
+               try {
+                       if (rpcService != null) {
+                               rpcService.stopService();
+                       }
+                       return priorException;
+               }
+               catch (Throwable t) {
+                       return firstOrSuppressed(t, priorException);
+               }
+       }
+
+       private static MiniClusterConfiguration createConfig(Configuration cfg, 
boolean singleActorSystem) {
+               MiniClusterConfiguration config = cfg == null ?
+                               new MiniClusterConfiguration() :
+                               new MiniClusterConfiguration(cfg);
+
+               if (!singleActorSystem) {
+                       config.setUseRpcServicePerComponent();
+               }
+
+               return config;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
new file mode 100644
index 0000000..a8d7b10
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class MiniClusterConfiguration {
+
+       private final Configuration config;
+
+       private boolean singleRpcService = true;
+
+       private int numJobManagers = 1;
+
+       private int numTaskManagers = 1;
+
+       private String commonBindAddress;
+
+       // 
------------------------------------------------------------------------
+       //  Construction
+       // 
------------------------------------------------------------------------
+
+       public MiniClusterConfiguration() {
+               this.config = new Configuration();
+       }
+
+       public MiniClusterConfiguration(Configuration config) {
+               checkNotNull(config);
+               this.config = new Configuration(config);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  setters
+       // 
------------------------------------------------------------------------
+
+       public void addConfiguration(Configuration config) {
+               checkNotNull(config, "configuration must not be null");
+               this.config.addAll(config);
+       }
+
+       public void setUseSingleRpcService() {
+               this.singleRpcService = true;
+       }
+
+       public void setUseRpcServicePerComponent() {
+               this.singleRpcService = false;
+       }
+
+       public void setNumJobManagers(int numJobManagers) {
+               checkArgument(numJobManagers >= 1, "must have at least one 
JobManager");
+               this.numJobManagers = numJobManagers;
+       }
+
+       public void setNumTaskManagers(int numTaskManagers) {
+               checkArgument(numTaskManagers >= 1, "must have at least one 
TaskManager");
+               this.numTaskManagers = numTaskManagers;
+       }
+
+       public void setNumTaskManagerSlots(int numTaskSlots) {
+               checkArgument(numTaskSlots >= 1, "must have at least one task 
slot per TaskManager");
+               
this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numTaskSlots);
+       }
+
+       public void setCommonRpcBindAddress(String bindAddress) {
+               checkNotNull(bindAddress, "bind address must not be null");
+               this.commonBindAddress = bindAddress;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  getters
+       // 
------------------------------------------------------------------------
+
+       public Configuration getConfiguration() {
+               return config;
+       }
+
+       public boolean getUseSingleRpcSystem() {
+               return singleRpcService;
+       }
+
+       public int getNumJobManagers() {
+               return numJobManagers;
+       }
+
+       public int getNumTaskManagers() {
+               return numTaskManagers;
+       }
+
+       public int getNumSlotsPerTaskManager() {
+               return 
config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+       }
+
+       public String getJobManagerBindAddress() {
+               return commonBindAddress != null ?
+                               commonBindAddress :
+                               
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+       }
+
+       public String getTaskManagerBindAddress() {
+               return commonBindAddress != null ?
+                               commonBindAddress :
+                               
config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+       }
+
+       public Time getRpcTimeout() {
+               FiniteDuration duration = AkkaUtils.getTimeout(config);
+               return Time.of(duration.length(), duration.unit());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utils
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "MiniClusterConfiguration{" +
+                               "singleRpcService=" + singleRpcService +
+                               ", numJobManagers=" + numJobManagers +
+                               ", numTaskManagers=" + numTaskManagers +
+                               ", commonBindAddress='" + commonBindAddress + 
'\'' +
+                               ", config=" + config +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..d99eff6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts 
job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** lock to ensure that this dispatcher executes only one job at a time 
*/
+       private final Object lock = new Object();
+
+       /** the configuration with which the mini cluster was started */
+       private final Configuration configuration;
+
+       /** the RPC services to use by the job managers */
+       private final RpcService[] rpcServices;
+
+       /** services for discovery, leader election, and recovery */
+       private final HighAvailabilityServices haServices;
+
+       /** al the services that the JobManager needs, such as BLOB service, 
factories, etc */
+       private final JobManagerServices jobManagerServices;
+
+       /** Registry for all metrics in the mini cluster */
+       private final MetricRegistry metricRegistry;
+
+       /** The number of JobManagers to launch (more than one simulates a 
high-availability setup) */
+       private final int numJobManagers;
+
+       /** The runner for the job and master. non-null if a job is currently 
running */
+       private volatile JobManagerRunner[] runners;
+
+       /** flag marking the dispatcher as hut down */
+       private volatile boolean shutdown;
+
+
+       /**
+        * Starts a mini cluster job dispatcher.
+        * 
+        * <p>The dispatcher kicks off one JobManager per job, a behavior 
similar to a
+        * non-highly-available setup.
+        * 
+        * @param config The configuration of the mini cluster
+        * @param haServices Access to the discovery, leader election, and 
recovery services
+        * 
+        * @throws Exception Thrown, if the services for the JobMaster could 
not be started.
+        */
+       public MiniClusterJobDispatcher(
+                       Configuration config,
+                       RpcService rpcService,
+                       HighAvailabilityServices haServices,
+                       MetricRegistry metricRegistry) throws Exception {
+               this(config, haServices, metricRegistry, 1, new RpcService[] { 
rpcService });
+       }
+
+       /**
+        * Starts a mini cluster job dispatcher.
+        *
+        * <p>The dispatcher may kick off more than one JobManager per job, 
thus simulating
+        * a highly-available setup.
+        * 
+        * @param config The configuration of the mini cluster
+        * @param haServices Access to the discovery, leader election, and 
recovery services
+        * @param numJobManagers The number of JobMasters to start for each job.
+        * 
+        * @throws Exception Thrown, if the services for the JobMaster could 
not be started.
+        */
+       public MiniClusterJobDispatcher(
+                       Configuration config,
+                       HighAvailabilityServices haServices,
+                       MetricRegistry metricRegistry,
+                       int numJobManagers,
+                       RpcService[] rpcServices) throws Exception {
+               
+               checkArgument(numJobManagers >= 1);
+               checkArgument(rpcServices.length == numJobManagers);
+               
+               this.configuration = checkNotNull(config);
+               this.rpcServices = rpcServices;
+               this.haServices = checkNotNull(haServices);
+               this.metricRegistry = checkNotNull(metricRegistry);
+               this.numJobManagers = numJobManagers;
+
+               LOG.info("Creating JobMaster services");
+               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config, haServices);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Shuts down the mini cluster dispatcher. If a job is currently 
running, that job will be
+        * terminally failed.
+        */
+       public void shutdown() {
+               synchronized (lock) {
+                       if (!shutdown) {
+                               shutdown = true;
+
+                               LOG.info("Shutting down the dispatcher");
+
+                               // in this shutdown code we copy the references 
to the stack first,
+                               // to avoid concurrent modification
+
+                               JobManagerRunner[] runners = this.runners;
+                               if (runners != null) {
+                                       this.runners = null;
+
+                                       for (JobManagerRunner runner : runners) 
{
+                                               runner.shutdown();
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  submitting jobs
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This method executes a job in detached mode. The method returns 
immediately after the job
+        * has been added to the
+        *
+        * @param job  The Flink job to execute
+        *
+        * @throws JobExecutionException Thrown if anything went amiss during 
initial job launch,
+        *         or if the job terminally failed.
+        */
+       public void runDetached(JobGraph job) throws JobExecutionException {
+               checkNotNull(job);
+
+               LOG.info("Received job for detached execution {} ({})", 
job.getName(), job.getJobID());
+
+               synchronized (lock) {
+                       checkState(!shutdown, "mini cluster is shut down");
+                       checkState(runners == null, "mini cluster can only 
execute one job at a time");
+
+                       DetachedFinalizer finalizer = new 
DetachedFinalizer(numJobManagers);
+
+                       this.runners = startJobRunners(job, finalizer, 
finalizer);
+               }
+       }
+
+       /**
+        * This method runs a job in blocking mode. The method returns only 
after the job
+        * completed successfully, or after it failed terminally.
+        *
+        * @param job  The Flink job to execute 
+        * @return The result of the job execution
+        *
+        * @throws JobExecutionException Thrown if anything went amiss during 
initial job launch,
+        *         or if the job terminally failed.
+        */
+       public JobExecutionResult runJobBlocking(JobGraph job) throws 
JobExecutionException, InterruptedException {
+               checkNotNull(job);
+               
+               LOG.info("Received job for blocking execution {} ({})", 
job.getName(), job.getJobID());
+               final BlockingJobSync sync = new 
BlockingJobSync(job.getJobID(), numJobManagers);
+
+               synchronized (lock) {
+                       checkState(!shutdown, "mini cluster is shut down");
+                       checkState(runners == null, "mini cluster can only 
execute one job at a time");
+
+                       this.runners = startJobRunners(job, sync, sync);
+               }
+
+               try {
+                       return sync.getResult();
+               }
+               finally {
+                       // always clear the status for the next job
+                       runners = null;
+               }
+       }
+
+       private JobManagerRunner[] startJobRunners(
+                       JobGraph job,
+                       OnCompletionActions onCompletion,
+                       FatalErrorHandler errorHandler) throws 
JobExecutionException {
+
+               LOG.info("Starting {} JobMaster(s) for job {} ({})", 
numJobManagers, job.getName(), job.getJobID());
+
+               // we first need to mark the job as running in the HA services, 
so that the
+               // JobManager leader will recognize that it as work to do
+               try {
+                       
haServices.getRunningJobsRegistry().setJobRunning(job.getJobID());
+               }
+               catch (Throwable t) {
+                       throw new JobExecutionException(job.getJobID(),
+                                       "Could not register the job at the 
high-availability services", t);
+               }
+
+               // start all JobManagers
+               JobManagerRunner[] runners = new 
JobManagerRunner[numJobManagers];
+               for (int i = 0; i < numJobManagers; i++) {
+                       try {
+                               runners[i] = new JobManagerRunner(job, 
configuration,
+                                               rpcServices[i], haServices, 
jobManagerServices, metricRegistry, 
+                                               onCompletion, errorHandler);
+                               runners[i].start();
+                       }
+                       catch (Throwable t) {
+                               // shut down all the ones so far
+                               for (int k = 0; k <= i; k++) {
+                                       try {
+                                               if (runners[i] != null) {
+                                                       runners[i].shutdown();
+                                               }
+                                       } catch (Throwable ignored) {
+                                               // silent shutdown
+                                       }
+                               }
+
+                               // un-register the job from the 
high.availability services
+                               try {
+                                       
haServices.getRunningJobsRegistry().setJobFinished(job.getJobID());
+                               }
+                               catch (Throwable tt) {
+                                       LOG.warn("Could not properly unregister 
job from high-availability services", tt);
+                               }
+
+                               throw new JobExecutionException(job.getJobID(), 
"Could not start the JobManager(s) for the job", t);
+                       }
+               }
+
+               return runners;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test methods to simulate job master failures
+       // 
------------------------------------------------------------------------
+
+//     public void killJobMaster(int which) {
+//             checkArgument(which >= 0 && which < numJobManagers, "no such 
job master");
+//             checkState(!shutdown, "mini cluster is shut down");
+//
+//             JobManagerRunner[] runners = this.runners;
+//             checkState(runners != null, "mini cluster it not executing a 
job right now");
+//
+//             runners[which].shutdown(new Throwable("kill JobManager"));
+//     }
+
+       // 
------------------------------------------------------------------------
+       //  utility classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Simple class that waits for all runners to have reported that they 
are done.
+        * In the case of a high-availability test setup, there may be multiple 
runners.
+        * After that, it marks the mini cluster as ready to receive new jobs.
+        */
+       private class DetachedFinalizer implements OnCompletionActions, 
FatalErrorHandler {
+
+               private final AtomicInteger numJobManagersToWaitFor;
+
+               private DetachedFinalizer(int numJobManagersToWaitFor) {
+                       this.numJobManagersToWaitFor = new 
AtomicInteger(numJobManagersToWaitFor);
+               }
+
+               @Override
+               public void jobFinished(JobExecutionResult result) {
+                       decrementCheckAndCleanup();
+               }
+
+               @Override
+               public void jobFailed(Throwable cause) {
+                       decrementCheckAndCleanup();
+               }
+
+               @Override
+               public void jobFinishedByOther() {
+                       decrementCheckAndCleanup();
+               }
+
+               @Override
+               public void onFatalError(Throwable exception) {
+                       decrementCheckAndCleanup();
+               }
+
+               private void decrementCheckAndCleanup() {
+                       if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+                               MiniClusterJobDispatcher.this.runners = null;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This class is used to sync on blocking jobs across multiple runners.
+        * Only after all runners reported back that they are finished, the
+        * result will be released.
+        * 
+        * That way it is guaranteed that after the blocking job submit call 
returns,
+        * the dispatcher is immediately free to accept another job.
+        */
+       private static class BlockingJobSync implements OnCompletionActions, 
FatalErrorHandler {
+
+               private final JobID jobId;
+
+               private final CountDownLatch jobMastersToWaitFor;
+
+               private volatile Throwable jobException;
+
+               private volatile Throwable runnerException;
+
+               private volatile JobExecutionResult result;
+               
+               BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+                       this.jobId = jobId;
+                       this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
+               }
+
+               @Override
+               public void jobFinished(JobExecutionResult jobResult) {
+                       this.result = jobResult;
+                       jobMastersToWaitFor.countDown();
+               }
+
+               @Override
+               public void jobFailed(Throwable cause) {
+                       jobException = cause;
+                       jobMastersToWaitFor.countDown();
+               }
+
+               @Override
+               public void jobFinishedByOther() {
+                       this.jobMastersToWaitFor.countDown();
+               }
+
+               @Override
+               public void onFatalError(Throwable exception) {
+                       if (runnerException == null) {
+                               runnerException = exception;
+                       }
+               }
+
+               public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedException {
+                       jobMastersToWaitFor.await();
+
+                       final Throwable jobFailureCause = this.jobException;
+                       final Throwable runnerException = this.runnerException;
+                       final JobExecutionResult result = this.result;
+
+                       // (1) we check if the job terminated with an exception
+                       // (2) we check whether the job completed successfully
+                       // (3) we check if we have exceptions from the 
JobManagers. the job may still have
+                       //     completed successfully in that case, if multiple 
JobMasters were running
+                       //     and other took over. only if all encounter a 
fatal error, the job cannot finish
+
+                       if (jobFailureCause != null) {
+                               if (jobFailureCause instanceof 
JobExecutionException) {
+                                       throw (JobExecutionException) 
jobFailureCause;
+                               }
+                               else {
+                                       throw new JobExecutionException(jobId, 
"The job execution failed", jobFailureCause);
+                               }
+                       }
+                       else if (result != null) {
+                               return result;
+                       }
+                       else if (runnerException != null) {
+                               throw new JobExecutionException(jobId,
+                                               "The job execution failed 
because all JobManagers encountered fatal errors", runnerException);
+                       }
+                       else {
+                               throw new IllegalStateException("Bug: Job 
finished with neither error nor result.");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6f6d525..3122804 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                } else {
                        try {
                                LeaderRetrievalService jobMasterLeaderRetriever 
=
-                                       
highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress);
+                                       
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
                                jobIdLeaderListener = new 
JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
                        } catch (Exception e) {
                                log.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 9e71349..e7f52e2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -191,8 +191,7 @@ public class JobLeaderService {
                LOG.info("Add job {} for job leader monitoring.", jobId);
 
                final LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(
-                       jobId,
-                       defaultTargetAddress);
+                       jobId);
 
                JobLeaderService.JobManagerLeaderListener 
jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3e88e8c..877812b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultJobManagerAddress) throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
                LeaderRetrievalService service = 
this.jobMasterLeaderRetrievers.get(jobID);
                if (service != null) {
                        return service;

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
new file mode 100644
index 0000000..dd43337
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Integration test cases for the {@link MiniCluster}.
+ */
+public class MiniClusterITCase extends TestLogger {
+
+//     @Test
+       public void runJobWithSingleRpcService() throws Exception {
+               MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+
+               // should be the default, but set anyways to make sure the test
+               // stays valid when the default changes
+               cfg.setUseSingleRpcService();
+
+               MiniCluster miniCluster = new MiniCluster(cfg);
+               executeJob(miniCluster);
+       }
+
+//     @Test
+       public void runJobWithMultipleRpcServices() throws Exception {
+               MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+               cfg.setUseRpcServicePerComponent();
+
+               MiniCluster miniCluster = new MiniCluster(cfg);
+               executeJob(miniCluster);
+       }
+
+//     @Test
+       public void runJobWithMultipleJobManagers() throws Exception {
+               MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+               cfg.setNumJobManagers(3);
+
+               MiniCluster miniCluster = new MiniCluster(cfg);
+               executeJob(miniCluster);
+       }
+
+       private static void executeJob(MiniCluster miniCluster) throws 
Exception {
+               miniCluster.start();
+
+               JobGraph job = getSimpleJob();
+               miniCluster.runJobBlocking(job);
+       }
+
+       private static JobGraph getSimpleJob() {
+               JobVertex task = new JobVertex("Test task");
+               task.setParallelism(1);
+               task.setMaxParallelism(1);
+               task.setInvokableClass(NoOpInvokable.class);
+
+               return new JobGraph(new JobID(), "Test Job", task);
+       }
+}

Reply via email to