http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 934a795..6abea2a 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,11 +27,11 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +92,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
 
-               ForkableFlinkMiniCluster flink = new 
ForkableFlinkMiniCluster(flinkConfig, false);
+               LocalFlinkMiniCluster flink = new 
LocalFlinkMiniCluster(flinkConfig, false);
                flink.start();
 
                final int flinkPort = flink.getLeaderRPCPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
index ee415d1..29b3a3e 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/ScalaStreamingMultipleProgramsTestBase.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.streaming.util.TestStreamEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.junit.JUnitSuiteLike
 
@@ -29,7 +30,7 @@ trait ScalaStreamingMultipleProgramsTestBase
   with BeforeAndAfterAll {
 
   val parallelism = 4
-  var cluster: Option[ForkableFlinkMiniCluster] = None
+  var cluster: Option[LocalFlinkMiniCluster] = None
 
   override protected def beforeAll(): Unit = {
     val cluster = Some(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml 
b/flink-test-utils-parent/flink-test-utils/pom.xml
index 2ab52b5..18ecfde 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -79,153 +79,4 @@ under the License.
                </dependency>
 
        </dependencies>
-
-       <build>
-               <plugins>
-                       <!-- Scala Compiler -->
-                       <plugin>
-                               <groupId>net.alchim31.maven</groupId>
-                               <artifactId>scala-maven-plugin</artifactId>
-                               <version>3.1.4</version>
-                               <executions>
-                                       <!-- Run scala compiler in the 
process-resources phase, so that dependencies 
-                                               on scala classes can be 
resolved later in the (Java) compile phase -->
-                                       <execution>
-                                               <id>scala-compile-first</id>
-                                               <phase>process-resources</phase>
-                                               <goals>
-                                                       <goal>compile</goal>
-                                               </goals>
-                                       </execution>
-
-                                       <!-- Run scala compiler in the 
process-test-resources phase, so that 
-                                               dependencies on scala classes 
can be resolved later in the (Java) test-compile 
-                                               phase -->
-                                       <execution>
-                                               <id>scala-test-compile</id>
-                                               
<phase>process-test-resources</phase>
-                                               <goals>
-                                                       <goal>testCompile</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                               <configuration>
-                                       <jvmArgs>
-                                               <jvmArg>-Xms128m</jvmArg>
-                                               <jvmArg>-Xmx512m</jvmArg>
-                                       </jvmArgs>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Eclipse Integration -->
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-eclipse-plugin</artifactId>
-                               <version>2.8</version>
-                               <configuration>
-                                       <downloadSources>true</downloadSources>
-                                       <projectnatures>
-                                               
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-                                               
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-                                       </projectnatures>
-                                       <buildcommands>
-                                               
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-                                       </buildcommands>
-                                       <classpathContainers>
-                                               
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-                                               
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-                                       </classpathContainers>
-                                       <excludes>
-                                               
<exclude>org.scala-lang:scala-library</exclude>
-                                               
<exclude>org.scala-lang:scala-compiler</exclude>
-                                       </excludes>
-                                       <sourceIncludes>
-                                               
<sourceInclude>**/*.scala</sourceInclude>
-                                               
<sourceInclude>**/*.java</sourceInclude>
-                                       </sourceIncludes>
-                               </configuration>
-                       </plugin>
-
-                       <!-- Adding scala source directories to build path -->
-                       <plugin>
-                               <groupId>org.codehaus.mojo</groupId>
-                               
<artifactId>build-helper-maven-plugin</artifactId>
-                               <version>1.7</version>
-                               <executions>
-                                       <!-- Add src/main/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-source</id>
-                                               <phase>generate-sources</phase>
-                                               <goals>
-                                                       <goal>add-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/main/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                                       <!-- Add src/test/scala to eclipse 
build path -->
-                                       <execution>
-                                               <id>add-test-source</id>
-                                               
<phase>generate-test-sources</phase>
-                                               <goals>
-                                                       
<goal>add-test-source</goal>
-                                               </goals>
-                                               <configuration>
-                                                       <sources>
-                                                               
<source>src/test/scala</source>
-                                                       </sources>
-                                               </configuration>
-                                       </execution>
-                               </executions>
-                       </plugin>
-
-                       <!-- Scala Code Style, most of the configuration done 
via plugin management -->
-                       <plugin>
-                               <groupId>org.scalastyle</groupId>
-                               <artifactId>scalastyle-maven-plugin</artifactId>
-                               <configuration>
-                                       
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
-                               </configuration>
-                       </plugin>
-
-               </plugins>
-               <pluginManagement>
-                       <plugins>
-                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
-                               <plugin>
-                                       <groupId>org.eclipse.m2e</groupId>
-                                       
<artifactId>lifecycle-mapping</artifactId>
-                                       <version>1.0.0</version>
-                                       <configuration>
-                                               <lifecycleMappingMetadata>
-                                                       <pluginExecutions>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>
-                                                                               
        net.alchim31.maven
-                                                                               
</groupId>
-                                                                               
<artifactId>
-                                                                               
        scala-maven-plugin
-                                                                               
</artifactId>
-                                                                               
<versionRange>
-                                                                               
        [3.1.4,)
-                                                                               
</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>compile</goal>
-                                                                               
        <goal>testCompile</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                       </pluginExecutions>
-                                               </lifecycleMappingMetadata>
-                                       </configuration>
-                               </plugin>
-                       </plugins>
-               </pluginManagement>
-       </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index c5fbaf0..a478908 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.junit.AfterClass;
@@ -61,7 +61,7 @@ public class StreamingMultipleProgramsTestBase extends 
AbstractTestBase {
 
        protected static final int DEFAULT_PARALLELISM = 4;
 
-       protected static ForkableFlinkMiniCluster cluster;
+       protected static LocalFlinkMiniCluster cluster;
 
        public StreamingMultipleProgramsTestBase() {
                super(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index c700102..64c68dc 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -32,10 +32,10 @@ import org.apache.flink.util.Preconditions;
 public class TestStreamEnvironment extends StreamExecutionEnvironment {
        
        /** The mini cluster in which this environment executes its jobs */
-       private ForkableFlinkMiniCluster executor;
+       private LocalFlinkMiniCluster executor;
        
 
-       public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism) {
+       public TestStreamEnvironment(LocalFlinkMiniCluster executor, int 
parallelism) {
                this.executor = Preconditions.checkNotNull(executor);
                setParallelism(parallelism);
        }
@@ -57,7 +57,7 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
         * @param cluster The test cluster to run the test program on.
         * @param parallelism The default parallelism for the test programs.
         */
-       public static void setAsContext(final ForkableFlinkMiniCluster cluster, 
final int parallelism) {
+       public static void setAsContext(final LocalFlinkMiniCluster cluster, 
final int parallelism) {
                
                StreamExecutionEnvironmentFactory factory = new 
StreamExecutionEnvironmentFactory() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index c2da691..316fd21 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -48,7 +49,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
        protected int numTaskManagers = 1;
        
        /** The mini cluster that runs the test programs */
-       protected ForkableFlinkMiniCluster executor;
+       protected LocalFlinkMiniCluster executor;
        
 
        public AbstractTestBase(Configuration config) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
index d7f09bd..4e83245 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MultipleProgramsTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.util;
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.runners.Parameterized;
@@ -72,7 +73,7 @@ public class MultipleProgramsTestBase extends TestBaseUtils {
 
        protected static boolean startWebServer = false;
 
-       protected static ForkableFlinkMiniCluster cluster = null;
+       protected static LocalFlinkMiniCluster cluster = null;
        
        // 
------------------------------------------------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 4014b80..b774f97 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -32,7 +32,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -104,7 +105,7 @@ public class TestBaseUtils extends TestLogger {
        }
        
        
-       public static ForkableFlinkMiniCluster startCluster(
+       public static LocalFlinkMiniCluster startCluster(
                int numTaskManagers,
                int taskManagerNumSlots,
                boolean startWebserver,
@@ -126,7 +127,7 @@ public class TestBaseUtils extends TestLogger {
                return startCluster(config, singleActorSystem);
        }
 
-       public static ForkableFlinkMiniCluster startCluster(
+       public static LocalFlinkMiniCluster startCluster(
                Configuration config,
                boolean singleActorSystem) throws Exception {
 
@@ -147,7 +148,7 @@ public class TestBaseUtils extends TestLogger {
                
                config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.toString());
 
-               ForkableFlinkMiniCluster cluster =  new 
ForkableFlinkMiniCluster(config, singleActorSystem);
+               LocalFlinkMiniCluster cluster =  new 
LocalFlinkMiniCluster(config, singleActorSystem);
 
                cluster.start();
 
@@ -155,7 +156,7 @@ public class TestBaseUtils extends TestLogger {
        }
 
 
-       public static void stopCluster(ForkableFlinkMiniCluster executor, 
FiniteDuration timeout) throws Exception {
+       public static void stopCluster(LocalFlinkMiniCluster executor, 
FiniteDuration timeout) throws Exception {
                if (logDir != null) {
                        FileUtils.deleteDirectory(logDir);
                }
@@ -169,11 +170,15 @@ public class TestBaseUtils extends TestLogger {
                                List<Future<Object>> 
numActiveConnectionsResponseFutures = new ArrayList<>();
 
                                for (ActorRef tm : tms) {
-                                       
bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
-                                                       
.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout(timeout)));
-
-                                       
numActiveConnectionsResponseFutures.add(Patterns.ask(tm, 
TestingTaskManagerMessages
-                                                       
.RequestNumActiveConnections$.MODULE$, new Timeout(timeout)));
+                                       
bcVariableManagerResponseFutures.add(Patterns.ask(
+                                               tm,
+                                               
TaskManagerMessages.getRequestBroadcastVariablesWithReferences(),
+                                               new Timeout(timeout)));
+
+                                       
numActiveConnectionsResponseFutures.add(Patterns.ask(
+                                               tm,
+                                               
TaskManagerMessages.getRequestNumActiveConnections(),
+                                               new Timeout(timeout)));
                                }
 
                                Future<Iterable<Object>> 
bcVariableManagerFutureResponses = Futures.sequence(
@@ -182,8 +187,7 @@ public class TestBaseUtils extends TestLogger {
                                Iterable<Object> responses = 
Await.result(bcVariableManagerFutureResponses, timeout);
 
                                for (Object response : responses) {
-                                       numUnreleasedBCVars += 
((TestingTaskManagerMessages
-                                                       
.ResponseBroadcastVariablesWithReferences) response).number();
+                                       numUnreleasedBCVars += 
((TaskManagerMessages.ResponseBroadcastVariablesWithReferences) 
response).number();
                                }
 
                                Future<Iterable<Object>> 
numActiveConnectionsFutureResponses = Futures.sequence(
@@ -192,8 +196,7 @@ public class TestBaseUtils extends TestLogger {
                                responses = 
Await.result(numActiveConnectionsFutureResponses, timeout);
 
                                for (Object response : responses) {
-                                       numActiveConnections += 
((TestingTaskManagerMessages
-                                                       
.ResponseNumActiveConnections) response).number();
+                                       numActiveConnections += 
((TaskManagerMessages.ResponseNumActiveConnections) response).number();
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 7cb88be..aea8152 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -29,10 +29,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 public class TestEnvironment extends ExecutionEnvironment {
 
-       private final ForkableFlinkMiniCluster executor;
+       private final LocalFlinkMiniCluster executor;
 
        private TestEnvironment lastEnv = null;
 
@@ -46,7 +47,7 @@ public class TestEnvironment extends ExecutionEnvironment {
                }
        }
 
-       public TestEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism) {
+       public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism) 
{
                this.executor = executor;
                setParallelism(parallelism);
 
@@ -54,7 +55,7 @@ public class TestEnvironment extends ExecutionEnvironment {
                getConfig().setCodeAnalysisMode(CodeAnalysisMode.DISABLE);
        }
 
-       public TestEnvironment(ForkableFlinkMiniCluster executor, int 
parallelism, boolean isObjectReuseEnabled) {
+       public TestEnvironment(LocalFlinkMiniCluster executor, int parallelism, 
boolean isObjectReuseEnabled) {
                this(executor, parallelism);
 
                if (isObjectReuseEnabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
deleted file mode 100644
index fa3135a..0000000
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.Patterns._
-import akka.pattern.ask
-
-import org.apache.curator.test.TestingCluster
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.jobmanager.{JobManager, HighAvailabilityMode}
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
-import org.apache.flink.runtime.taskmanager.TaskManager
-import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.testingUtils.{TestingJobManager, 
TestingMemoryArchivist, TestingTaskManager}
-import org.apache.flink.runtime.testutils.TestingResourceManager
-
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
-
-/**
- * A forkable mini cluster is a special case of the mini cluster, used for 
parallel test execution
- * on build servers. If multiple tests run in parallel, the cluster picks up 
the fork number and
- * uses it to avoid port conflicts.
- *
- * @param userConfiguration Configuration object with the user provided 
configuration values
- * @param singleActorSystem true, if all actors (JobManager and TaskManager) 
shall be run in the
- *                          same [[ActorSystem]], otherwise false.
- */
-class ForkableFlinkMiniCluster(
-    userConfiguration: Configuration,
-    singleActorSystem: Boolean)
-  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
-
-  def this(userConfiguration: Configuration) = this(userConfiguration, true)
-
-  // --------------------------------------------------------------------------
-
-  var zookeeperCluster: Option[TestingCluster] = None
-
-  override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
-    val forkNumberString = System.getProperty("forkNumber")
-
-    val forkNumber = try {
-      Integer.parseInt(forkNumberString)
-    }
-    catch {
-      case e: NumberFormatException => -1
-    }
-
-    val config = userConfiguration.clone()
-
-    if (forkNumber != -1) {
-      val jobManagerRPC = 1024 + forkNumber*400
-      val taskManagerRPC = 1024 + forkNumber*400 + 100
-      val taskManagerData = 1024 + forkNumber*400 + 200
-      val resourceManagerRPC = 1024 + forkNumber*400 + 300
-
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 
taskManagerRPC)
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
taskManagerData)
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 
resourceManagerRPC)
-    }
-
-    super.generateConfiguration(config)
-  }
-
-  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef 
= {
-    val config = configuration.clone()
-
-    val jobManagerName = getJobManagerName(index)
-    val archiveName = getArchiveName(index)
-
-    val jobManagerPort = config.getInteger(
-      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-
-    if (jobManagerPort > 0) {
-      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerPort + index)
-    }
-
-    val (jobManager, _) = JobManager.startJobManagerActors(
-      config,
-      actorSystem,
-      Some(jobManagerName),
-      Some(archiveName),
-      classOf[TestingJobManager],
-      classOf[TestingMemoryArchivist])
-
-    jobManager
-  }
-
-  override def startResourceManager(index: Int, system: ActorSystem): ActorRef 
= {
-    val config = configuration.clone()
-
-    val resourceManagerName = getResourceManagerName(index)
-
-    val resourceManagerPort = config.getInteger(
-      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
-
-    if (resourceManagerPort > 0) {
-      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 
resourceManagerPort + index)
-    }
-
-    val resourceManager = FlinkResourceManager.startResourceManagerActors(
-      config,
-      system,
-      createLeaderRetrievalService(),
-      classOf[TestingResourceManager],
-      resourceManagerName)
-
-    resourceManager
-  }
-
-  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
-    val config = configuration.clone()
-
-    val rpcPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
-
-    val dataPort = config.getInteger(
-      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
-
-    if (rpcPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + 
index)
-    }
-    if (dataPort > 0) {
-      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + 
index)
-    }
-
-    val localExecution = numTaskManagers == 1
-
-    TaskManager.startTaskManagerComponentsAndActor(
-      config,
-      ResourceID.generate(),
-      system,
-      hostname,
-      Some(TaskManager.TASK_MANAGER_NAME + index),
-      Some(createLeaderRetrievalService()),
-      localExecution,
-      classOf[TestingTaskManager])
-  }
-
-  def addTaskManager(): Unit = {
-    if (useSingleActorSystem) {
-      (jobManagerActorSystems, taskManagerActors) match {
-        case (Some(jmSystems), Some(tmActors)) =>
-          val index = numTaskManagers
-          taskManagerActors = Some(tmActors :+ startTaskManager(index, 
jmSystems(0)))
-          numTaskManagers += 1
-        case _ => throw new IllegalStateException("Cluster has not been 
started properly.")
-      }
-    } else {
-      (taskManagerActorSystems, taskManagerActors) match {
-        case (Some(tmSystems), Some(tmActors)) =>
-          val index = numTaskManagers
-          val newTmSystem = startTaskManagerActorSystem(index)
-          val newTmActor = startTaskManager(index, newTmSystem)
-
-          taskManagerActorSystems = Some(tmSystems :+ newTmSystem)
-          taskManagerActors = Some(tmActors :+ newTmActor)
-
-          numTaskManagers += 1
-        case _ => throw new IllegalStateException("Cluster has not been 
started properly.")
-      }
-    }
-  }
-
-  def restartLeadingJobManager(): Unit = {
-    this.synchronized {
-      (jobManagerActorSystems, jobManagerActors) match {
-        case (Some(jmActorSystems), Some(jmActors)) =>
-          val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
-          val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
-
-          clearLeader()
-
-          val stopped = gracefulStop(leader.actor(), 
ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-          Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
-          if(!singleActorSystem) {
-            jmActorSystems(index).shutdown()
-            jmActorSystems(index).awaitTermination()
-          }
-
-          val newJobManagerActorSystem = if(!singleActorSystem) {
-            startJobManagerActorSystem(index)
-          } else {
-            jmActorSystems.head
-          }
-
-          val newJobManagerActor = startJobManager(index, 
newJobManagerActorSystem)
-
-          jobManagerActors = Some(jmActors.patch(index, 
Seq(newJobManagerActor), 1))
-          jobManagerActorSystems = Some(jmActorSystems.patch(
-            index,
-            Seq(newJobManagerActorSystem),
-            1))
-
-          val lrs = createLeaderRetrievalService()
-
-          jobManagerLeaderRetrievalService = Some(lrs)
-          lrs.start(this)
-
-        case _ => throw new Exception("The JobManager of the 
ForkableFlinkMiniCluster have not " +
-          "been started properly.")
-      }
-    }
-  }
-
-
-  def restartTaskManager(index: Int): Unit = {
-    (taskManagerActorSystems, taskManagerActors) match {
-      case (Some(tmActorSystems), Some(tmActors)) =>
-        val stopped = gracefulStop(tmActors(index), 
ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-        Await.result(stopped, ForkableFlinkMiniCluster.MAX_RESTART_DURATION)
-
-        if(!singleActorSystem) {
-          tmActorSystems(index).shutdown()
-          tmActorSystems(index).awaitTermination()
-        }
-
-        val taskManagerActorSystem  = if(!singleActorSystem) {
-          startTaskManagerActorSystem(index)
-        } else {
-          tmActorSystems.head
-        }
-
-        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
-
-        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 
1))
-        taskManagerActorSystems = Some(tmActorSystems.patch(index, 
Seq(taskManagerActorSystem), 1))
-
-      case _ => throw new Exception("The TaskManager of the 
ForkableFlinkMiniCluster have not " +
-        "been started properly.")
-    }
-  }
-
-  override def start(): Unit = {
-    val zookeeperURL = 
configuration.getString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY, "")
-
-    zookeeperCluster = if (haMode == HighAvailabilityMode.ZOOKEEPER &&
-      zookeeperURL.equals("")) {
-      LOG.info("Starting ZooKeeper cluster.")
-
-      val testingCluster = new TestingCluster(1)
-
-      configuration.setString(ConfigConstants.HA_ZOOKEEPER_QUORUM_KEY,
-        testingCluster.getConnectString)
-
-      testingCluster.start()
-
-      Some(testingCluster)
-    } else {
-      None
-    }
-
-    super.start()
-  }
-
-  override def stop(): Unit = {
-    super.stop()
-
-    zookeeperCluster.foreach{
-      LOG.info("Stopping ZooKeeper cluster.")
-      _.close()
-    }
-  }
-
-  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): 
Unit = {
-    val futures = taskManagerActors.map {
-      _.map {
-        tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
-      }
-    }.getOrElse(Seq())
-
-    try {
-      Await.ready(Future.sequence(futures), timeout)
-    } catch {
-      case t: TimeoutException =>
-        throw new Exception("Timeout while waiting for TaskManagers to 
register at " +
-          s"${jobManager.path}")
-    }
-
-  }
-}
-
-object ForkableFlinkMiniCluster {
-
-  val MAX_RESTART_DURATION = 2 minute
-
-  val DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT = "200 s"
-
-  def startCluster(
-                    numSlots: Int,
-                    numTaskManagers: Int,
-                    timeout: String = DEFAULT_MINICLUSTER_AKKA_ASK_TIMEOUT)
-  : ForkableFlinkMiniCluster = {
-
-    val config = new Configuration()
-    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
-    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers)
-    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
-
-    val cluster = new ForkableFlinkMiniCluster(config)
-
-    cluster.start()
-
-    cluster
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index cac8451..cc70fee 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,7 +44,7 @@ import static org.junit.Assert.fail;
  */
 public class AccumulatorErrorITCase {
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @BeforeClass
        public static void startCluster() {
@@ -53,7 +53,7 @@ public class AccumulatorErrorITCase {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
 
                        cluster.start();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 49e18e0..624bfff 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -234,7 +234,6 @@ public class AccumulatorLiveITCase {
                                fail("Wrong accumulator results when map task 
begins execution.");
                        }
 
-
                        int expectedAccVal = 0;
 
                        /* for mapper task */

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 9671fce..8a08f15 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,8 +22,8 @@ package org.apache.flink.test.cancelling;
 import java.util.concurrent.TimeUnit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import static org.apache.flink.runtime.taskmanager.TaskCancelTest.awaitRunning;
 import static org.apache.flink.runtime.taskmanager.TaskCancelTest.cancelJob;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
-import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.junit.After;
@@ -65,7 +64,7 @@ public abstract class CancelingTestBase extends TestLogger {
 
        // 
--------------------------------------------------------------------------------------------
        
-       protected ForkableFlinkMiniCluster executor;
+       protected LocalFlinkMiniCluster executor;
 
        protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
        
@@ -88,7 +87,7 @@ public abstract class CancelingTestBase extends TestLogger {
                
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
                
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
 
-               this.executor = new ForkableFlinkMiniCluster(config, false);
+               this.executor = new LocalFlinkMiniCluster(config, false);
                this.executor.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 163fb42..94ff66f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -35,7 +36,6 @@ import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -60,7 +60,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
 
        private static final int PARALLELISM = 4;
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
 
        @BeforeClass
@@ -71,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
                config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, 
"60 s");
                config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 
s");
-               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster = new LocalFlinkMiniCluster(config, false);
                cluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index fa5339d..0aee128 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -41,7 +42,6 @@ import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -76,7 +76,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
        private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024;
        private static final int PARALLELISM = 4;
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -95,7 +95,7 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
 
-               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster = new LocalFlinkMiniCluster(config, false);
                cluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8915bff..7f1d7f3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -43,7 +44,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -73,7 +73,7 @@ public class RescalingITCase extends TestLogger {
        private static int slotsPerTaskManager = 2;
        private static int numSlots = numTaskManagers * slotsPerTaskManager;
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static TestingCluster cluster;
 
        @ClassRule
        public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -92,7 +92,7 @@ public class RescalingITCase extends TestLogger {
                config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"filesystem");
                config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, 
savepointDir.toURI().toString());
 
-               cluster = new ForkableFlinkMiniCluster(config);
+               cluster = new TestingCluster(config);
                cluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 550ba75..7409fe7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -51,6 +50,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
@@ -62,8 +62,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
@@ -76,7 +74,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -137,7 +134,7 @@ public class SavepointITCase extends TestLogger {
 
                LOG.info("Created temporary directory: " + tmpDir + ".");
 
-               ForkableFlinkMiniCluster flink = null;
+               TestingCluster flink = null;
 
                try {
                        // Create a test actor system
@@ -168,7 +165,7 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("Flink configuration: " + config + ".");
 
                        // Start Flink
-                       flink = new ForkableFlinkMiniCluster(config);
+                       flink = new TestingCluster(config);
                        LOG.info("Starting Flink cluster.");
                        flink.start();
 
@@ -261,7 +258,7 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("JobManager: " + jobManager + ".");
 
                        final Throwable[] error = new Throwable[1];
-                       final ForkableFlinkMiniCluster finalFlink = flink;
+                       final TestingCluster finalFlink = flink;
                        final Multimap<JobVertexID, TaskDeploymentDescriptor> 
tdds = HashMultimap.create();
                        new JavaTestKit(testActorSystem) {{
 
@@ -422,7 +419,7 @@ public class SavepointITCase extends TestLogger {
 
                LOG.info("Created temporary directory: " + tmpDir + ".");
 
-               ForkableFlinkMiniCluster flink = null;
+               TestingCluster flink = null;
                List<File> checkpointFiles = new ArrayList<>();
 
                try {
@@ -447,7 +444,7 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("Flink configuration: " + config + ".");
 
                        // Start Flink
-                       flink = new ForkableFlinkMiniCluster(config);
+                       flink = new TestingCluster(config);
                        LOG.info("Starting Flink cluster.");
                        flink.start();
 
@@ -559,7 +556,7 @@ public class SavepointITCase extends TestLogger {
                // Test deadline
                final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
 
-               ForkableFlinkMiniCluster flink = null;
+               TestingCluster flink = null;
 
                try {
                        // Flink configuration
@@ -570,7 +567,7 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("Flink configuration: " + config + ".");
 
                        // Start Flink
-                       flink = new ForkableFlinkMiniCluster(config);
+                       flink = new TestingCluster(config);
                        LOG.info("Starting Flink cluster.");
                        flink.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index cf15052..6bf511f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -34,7 +35,6 @@ import 
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 
 import org.apache.flink.util.TestLogger;
@@ -80,7 +80,7 @@ public class StreamCheckpointNotifierITCase extends 
TestLogger {
        private static final int NUM_TASK_SLOTS = 3;
        private static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @BeforeClass
        public static void startCluster() {
@@ -91,7 +91,7 @@ public class StreamCheckpointNotifierITCase extends 
TestLogger {
                        
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 ms");
                        
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
                        cluster.start();
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 67c05e5..5f6cd4a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
@@ -43,7 +43,7 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
        protected static final int NUM_TASK_SLOTS = 4;
        protected static final int PARALLELISM = NUM_TASK_MANAGERS * 
NUM_TASK_SLOTS;
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @BeforeClass
        public static void startCluster() {
@@ -53,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
                        
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
 
                        cluster.start();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 2e6ce78..e424a8d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
@@ -34,7 +35,6 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
@@ -71,7 +71,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 
        private static final int PARALLELISM = 4;
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
 
        @BeforeClass
@@ -81,7 +81,7 @@ public class WindowCheckpointingITCase extends TestLogger {
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
 
-               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster = new LocalFlinkMiniCluster(config, false);
                cluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 8b56d3d..7afafe4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.classloading;
 
-import akka.pattern.AskTimeoutException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -37,9 +36,9 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.test.testdata.KMeansData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -82,7 +81,7 @@ public class ClassLoaderITCase extends TestLogger {
 
        public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-       private static ForkableFlinkMiniCluster testCluster;
+       private static TestingCluster testCluster;
 
        private static int parallelism;
 
@@ -105,7 +104,7 @@ public class ClassLoaderITCase extends TestLogger {
                config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
                                
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
-               testCluster = new ForkableFlinkMiniCluster(config, false);
+               testCluster = new TestingCluster(config, false);
                testCluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index c9059f1..a74ed34 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -42,7 +42,6 @@ import java.util.concurrent.Semaphore;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
-
 /**
  * Tests retrieval of a job from a running Flink cluster
  */
@@ -54,7 +53,7 @@ public class JobRetrievalITCase extends TestLogger {
 
        @BeforeClass
        public static void before() {
-               cluster = new ForkableFlinkMiniCluster(new Configuration(), 
false);
+               cluster = new TestingCluster(new Configuration(), false);
                cluster.start();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 28c2e58..178656d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -47,7 +47,7 @@ public class JobSubmissionFailsITCase {
        
        private static final int NUM_SLOTS = 20;
        
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
        private static JobGraph workingJobGraph;
 
        @BeforeClass
@@ -58,7 +58,7 @@ public class JobSubmissionFailsITCase {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
                        
-                       cluster = new ForkableFlinkMiniCluster(config);
+                       cluster = new LocalFlinkMiniCluster(config);
 
                        cluster.start();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index ca2c156..133ebd0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
@@ -52,7 +52,7 @@ public class CustomDistributionITCase extends TestLogger {
        //  The mini cluster that is shared across tests
        // 
------------------------------------------------------------------------
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @BeforeClass
        public static void setup() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 34a7eed..e18e82a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,10 @@ import 
org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -54,7 +53,7 @@ public class RemoteEnvironmentITCase {
 
        private static final String VALID_STARTUP_TIMEOUT = "100 s";
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @BeforeClass
        public static void setupCluster() {
@@ -62,7 +61,7 @@ public class RemoteEnvironmentITCase {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
                        cluster.start();
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 09b5e7e..a67e6ef 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -49,14 +49,14 @@ public class AutoParallelismITCase {
        private static final int SLOTS_PER_TM = 7;
        private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @BeforeClass
        public static void setupCluster() {
                Configuration config = new Configuration();
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TM);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
SLOTS_PER_TM);
-               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster = new LocalFlinkMiniCluster(config, false);
 
                cluster.start();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index f30f61f..51f3534 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -26,7 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.types.Value;
 
 import org.junit.AfterClass;
@@ -43,7 +43,7 @@ public class CustomSerializationITCase {
 
        private static final int PARLLELISM = 5;
        
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @BeforeClass
        public static void startCluster() {
@@ -51,7 +51,7 @@ public class CustomSerializationITCase {
                        Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
                        cluster.start();
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 42419fb..06b93ea 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 import org.apache.flink.util.Collector;
 
@@ -52,7 +52,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public class MiscellaneousIssuesITCase {
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
        
        @BeforeClass
        public static void startCluster() {
@@ -61,7 +61,7 @@ public class MiscellaneousIssuesITCase {
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
 
                        cluster.start();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index 12b7a68..a43bab6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -32,8 +32,8 @@ import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -43,7 +43,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
        
        @Test
        public void testSuccessfulProgramAfterFailure() {
-               ForkableFlinkMiniCluster cluster = null;
+               LocalFlinkMiniCluster cluster = null;
                
                try {
                        Configuration config = new Configuration();
@@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase {
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
                        
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
 
                        cluster.start();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 40732df..b99858a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -55,6 +55,7 @@ import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks;
@@ -62,7 +63,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.QueryableStateStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -108,7 +108,7 @@ public class QueryableStateITCase extends TestLogger {
         * Shared between all the test. Make sure to have at least NUM_SLOTS
         * available after your test finishes, e.g. cancel the job you 
submitted.
         */
-       private static ForkableFlinkMiniCluster cluster;
+       private static TestingCluster cluster;
 
        @BeforeClass
        public static void setup() {
@@ -120,7 +120,7 @@ public class QueryableStateITCase extends TestLogger {
                        
config.setInteger(ConfigConstants.QUERYABLE_STATE_CLIENT_NETWORK_THREADS, 1);
                        
config.setInteger(ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, 1);
 
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new TestingCluster(config, false);
                        cluster.start(true);
                } catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
index 8a45d62..8a43ee4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java
@@ -24,11 +24,11 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -49,7 +49,7 @@ public class FastFailuresITCase extends TestLogger {
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
2);
                
-               ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(config, false);
+               LocalFlinkMiniCluster cluster = new 
LocalFlinkMiniCluster(config, false);
                cluster.start();
                
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
index 0c5d14b..a0d6b58 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.BeforeClass;
 
 public class SimpleRecoveryFailureRateStrategyITBase extends 
SimpleRecoveryITCaseBase {
@@ -34,8 +34,8 @@ public class SimpleRecoveryFailureRateStrategyITBase extends 
SimpleRecoveryITCas
                
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
 "1 second");
                
config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");
 
-               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster = new LocalFlinkMiniCluster(config, false);
 
                cluster.start();
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
index 6355a8f..f09efc5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.test.recovery;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.BeforeClass;
 
 public class SimpleRecoveryFixedDelayRestartStrategyITBase extends 
SimpleRecoveryITCaseBase {
@@ -33,8 +33,8 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase 
extends SimpleRecover
                
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
                
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
 
-               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster = new LocalFlinkMiniCluster(config, false);
 
                cluster.start();
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
index 004340c..bf7c524 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -42,7 +42,7 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public abstract class SimpleRecoveryITCaseBase {
 
-       protected static ForkableFlinkMiniCluster cluster;
+       protected static LocalFlinkMiniCluster cluster;
 
        @AfterClass
        public static void teardownCluster() {

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 6c621ac..5d29905 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -65,7 +65,7 @@ public class TaskManagerFailureRecoveryITCase {
 
                final int PARALLELISM = 4;
 
-               ForkableFlinkMiniCluster cluster = null;
+               LocalFlinkMiniCluster cluster = null;
                ActorSystem additionalSystem = null;
 
                try {
@@ -78,7 +78,7 @@ public class TaskManagerFailureRecoveryITCase {
                        
config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
                        config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 
20);
 
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
 
                        cluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 7710f06..0b008eb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
@@ -63,7 +63,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 
                
                
-               ForkableFlinkMiniCluster flink = null;
+               LocalFlinkMiniCluster flink = null;
                try {
                        final String addressString = 
ipv6address.getHostAddress();
                        log.info("Test will use IPv6 address " + addressString 
+ " for connection tests");
@@ -75,7 +75,7 @@ public class IPv6HostnamesITCase extends TestLogger {
                        
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
                        
conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
                        
-                       flink = new ForkableFlinkMiniCluster(conf, false);
+                       flink = new LocalFlinkMiniCluster(conf, false);
                        flink.start();
 
                        ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(addressString, 
flink.getLeaderRPCPort());

Reply via email to