http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
new file mode 100644
index 0000000..8a348a1
--- /dev/null
+++ 
b/flink-test-utils/src/test/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.util
+
+import java.util.concurrent.TimeoutException
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.Patterns._
+import akka.pattern.ask
+import org.apache.curator.test.TestingCluster
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
+import org.apache.flink.runtime.taskmanager.TaskManager
+import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.testingUtils.{TestingJobManager, 
TestingMemoryArchivist, TestingTaskManager, TestingUtils}
+
+import org.apache.flink.runtime.testutils.TestingResourceManager
+
+import scala.concurrent.{Await, Future}
+
+/**
+ * A forkable mini cluster is a special case of the mini cluster, used for 
parallel test execution
+ * on build servers. If multiple tests run in parallel, the cluster picks up 
the fork number and
+ * uses it to avoid port conflicts.
+ *
+ * @param userConfiguration Configuration object with the user provided 
configuration values
+ * @param singleActorSystem true, if all actors (JobManager and TaskManager) 
shall be run in the
+ *                          same [[ActorSystem]], otherwise false.
+ */
+class ForkableFlinkMiniCluster(
+    userConfiguration: Configuration,
+    singleActorSystem: Boolean)
+  extends LocalFlinkMiniCluster(userConfiguration, singleActorSystem) {
+
+  def this(userConfiguration: Configuration) = this(userConfiguration, true)
+
+  // --------------------------------------------------------------------------
+
+  var zookeeperCluster: Option[TestingCluster] = None
+
+  override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
+    val forkNumberString = System.getProperty("forkNumber")
+
+    val forkNumber = try {
+      Integer.parseInt(forkNumberString)
+    }
+    catch {
+      case e: NumberFormatException => -1
+    }
+
+    val config = userConfiguration.clone()
+
+    if (forkNumber != -1) {
+      val jobManagerRPC = 1024 + forkNumber*400
+      val taskManagerRPC = 1024 + forkNumber*400 + 100
+      val taskManagerData = 1024 + forkNumber*400 + 200
+      val resourceManagerRPC = 1024 + forkNumber*400 + 300
+
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 
taskManagerRPC)
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
taskManagerData)
+      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 
resourceManagerRPC)
+    }
+
+    super.generateConfiguration(config)
+  }
+
+  override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef 
= {
+    val config = configuration.clone()
+
+    val jobManagerName = getJobManagerName(index)
+    val archiveName = getArchiveName(index)
+
+    val jobManagerPort = config.getInteger(
+      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    if (jobManagerPort > 0) {
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerPort + index)
+    }
+
+    val (jobManager, _) = JobManager.startJobManagerActors(
+      config,
+      actorSystem,
+      Some(jobManagerName),
+      Some(archiveName),
+      classOf[TestingJobManager],
+      classOf[TestingMemoryArchivist])
+
+    jobManager
+  }
+
+  override def startResourceManager(index: Int, system: ActorSystem): ActorRef 
= {
+    val config = configuration.clone()
+
+    val resourceManagerName = getResourceManagerName(index)
+
+    val resourceManagerPort = config.getInteger(
+      ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT)
+
+    if (resourceManagerPort > 0) {
+      config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 
resourceManagerPort + index)
+    }
+
+    val resourceManager = FlinkResourceManager.startResourceManagerActors(
+      config,
+      system,
+      createLeaderRetrievalService(),
+      classOf[TestingResourceManager],
+      resourceManagerName)
+
+    resourceManager
+  }
+
+  override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
+    val config = configuration.clone()
+
+    val rpcPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT)
+
+    val dataPort = config.getInteger(
+      ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
+
+    if (rpcPort > 0) {
+      config.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort + 
index)
+    }
+    if (dataPort > 0) {
+      config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + 
index)
+    }
+
+    val localExecution = numTaskManagers == 1
+
+    TaskManager.startTaskManagerComponentsAndActor(
+      config,
+      ResourceID.generate(),
+      system,
+      hostname,
+      Some(TaskManager.TASK_MANAGER_NAME + index),
+      Some(createLeaderRetrievalService()),
+      localExecution,
+      classOf[TestingTaskManager])
+  }
+
+  def restartLeadingJobManager(): Unit = {
+    this.synchronized {
+      (jobManagerActorSystems, jobManagerActors) match {
+        case (Some(jmActorSystems), Some(jmActors)) =>
+          val leader = getLeaderGateway(AkkaUtils.getTimeout(configuration))
+          val index = getLeaderIndex(AkkaUtils.getTimeout(configuration))
+
+          clearLeader()
+
+          val stopped = gracefulStop(leader.actor(), 
TestingUtils.TESTING_DURATION)
+          Await.result(stopped, TestingUtils.TESTING_DURATION)
+
+          if(!singleActorSystem) {
+            jmActorSystems(index).shutdown()
+            jmActorSystems(index).awaitTermination()
+          }
+
+          val newJobManagerActorSystem = if(!singleActorSystem) {
+            startJobManagerActorSystem(index)
+          } else {
+            jmActorSystems.head
+          }
+
+          val newJobManagerActor = startJobManager(index, 
newJobManagerActorSystem)
+
+          jobManagerActors = Some(jmActors.patch(index, 
Seq(newJobManagerActor), 1))
+          jobManagerActorSystems = Some(jmActorSystems.patch(
+            index,
+            Seq(newJobManagerActorSystem),
+            1))
+
+          val lrs = createLeaderRetrievalService()
+
+          jobManagerLeaderRetrievalService = Some(lrs)
+          lrs.start(this)
+
+        case _ => throw new Exception("The JobManager of the 
ForkableFlinkMiniCluster have not " +
+          "been started properly.")
+      }
+    }
+  }
+
+
+  def restartTaskManager(index: Int): Unit = {
+    (taskManagerActorSystems, taskManagerActors) match {
+      case (Some(tmActorSystems), Some(tmActors)) =>
+        val stopped = gracefulStop(tmActors(index), 
TestingUtils.TESTING_DURATION)
+        Await.result(stopped, TestingUtils.TESTING_DURATION)
+
+        if(!singleActorSystem) {
+          tmActorSystems(index).shutdown()
+          tmActorSystems(index).awaitTermination()
+        }
+
+        val taskManagerActorSystem  = if(!singleActorSystem) {
+          startTaskManagerActorSystem(index)
+        } else {
+          tmActorSystems.head
+        }
+
+        val taskManagerActor = startTaskManager(index, taskManagerActorSystem)
+
+        taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 
1))
+        taskManagerActorSystems = Some(tmActorSystems.patch(index, 
Seq(taskManagerActorSystem), 1))
+
+      case _ => throw new Exception("The TaskManager of the 
ForkableFlinkMiniCluster have not " +
+        "been started properly.")
+    }
+  }
+
+  override def start(): Unit = {
+    val zookeeperURL = 
configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "")
+
+    zookeeperCluster = if(recoveryMode == RecoveryMode.ZOOKEEPER && 
zookeeperURL.equals("")) {
+      LOG.info("Starting ZooKeeper cluster.")
+
+      val testingCluster = new TestingCluster(1)
+
+      configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingCluster.getConnectString)
+
+      testingCluster.start()
+
+      Some(testingCluster)
+    } else {
+      None
+    }
+
+    super.start()
+  }
+
+  override def stop(): Unit = {
+    super.stop()
+
+    zookeeperCluster.foreach{
+      LOG.info("Stopping ZooKeeper cluster.")
+      _.close()
+    }
+  }
+
+  def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): 
Unit = {
+    val futures = taskManagerActors.map {
+      _.map {
+        tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout)
+      }
+    }.getOrElse(Seq())
+
+    try {
+      Await.ready(Future.sequence(futures), timeout)
+    } catch {
+      case t: TimeoutException =>
+        throw new Exception("Timeout while waiting for TaskManagers to 
register at " +
+          s"${jobManager.path}")
+    }
+
+  }
+}
+
+object ForkableFlinkMiniCluster {
+
+  import 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT
+
+  def startCluster(
+                    numSlots: Int,
+                    numTaskManagers: Int,
+                    timeout: String = DEFAULT_AKKA_ASK_TIMEOUT)
+  : ForkableFlinkMiniCluster = {
+
+    val config = new Configuration()
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskManagers)
+    config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, timeout)
+
+    val cluster = new ForkableFlinkMiniCluster(config)
+
+    cluster.start()
+
+    cluster
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index ee7d0a3..0459039 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -105,6 +105,7 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils_2.10</artifactId>
                        <version>${project.version}</version>
+                       <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
                
@@ -539,6 +540,26 @@ under the License.
                                                                                
<ignore/>
                                                                        
</action>
                                                                
</pluginExecution>
+                                                               
<pluginExecution>
+                                                                       
<pluginExecutionFilter>
+                                                                               
<groupId>
+                                                                               
        net.alchim31.maven
+                                                                               
</groupId>
+                                                                               
<artifactId>
+                                                                               
        scala-maven-plugin
+                                                                               
</artifactId>
+                                                                               
<versionRange>
+                                                                               
        [3.1.4,)
+                                                                               
</versionRange>
+                                                                               
<goals>
+                                                                               
        <goal>compile</goal>
+                                                                               
        <goal>testCompile</goal>
+                                                                               
</goals>
+                                                                       
</pluginExecutionFilter>
+                                                                       <action>
+                                                                               
<ignore></ignore>
+                                                                       
</action>
+                                                               
</pluginExecution>
                                                        </pluginExecutions>
                                                </lifecycleMappingMetadata>
                                        </configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 4431106..6faee45 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -69,7 +69,8 @@ public class EventTimeAllWindowCheckpointingITCase extends 
TestLogger {
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
48);
-
+               config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, 
"60 s");
+               config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 
s");
                cluster = new ForkableFlinkMiniCluster(config, false);
                cluster.start();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 1d0951d..255eeee 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -38,17 +38,12 @@ under the License.
        <packaging>jar</packaging>
 
        <dependencies>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime_2.10</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-clients_2.10</artifactId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
                        <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
                </dependency>
 
                <!-- Needed for the streaming wordcount example -->
@@ -66,12 +61,6 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils_2.10</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
                        <artifactId>${shading-artifact.name}</artifactId>
                        <version>${project.version}</version>
                </dependency>
@@ -87,20 +76,7 @@ under the License.
                                </exclusion>
                        </exclusions>
                </dependency>
-                               
-               <dependency>
-                       <groupId>junit</groupId>
-                       <artifactId>junit</artifactId>
-                       <version>4.11</version>
-                       <type>jar</type>
-               </dependency>
-
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
-
+               
                <dependency>
                        <groupId>com.typesafe.akka</groupId>
                        
<artifactId>akka-testkit_${scala.binary.version}</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
deleted file mode 100644
index 30116af..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.test.util.TestBaseUtils;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkYarnSessionCliTest {
-
-       @Rule
-       public TemporaryFolder tmp = new TemporaryFolder();
-
-       @Test
-       public void testDynamicProperties() throws IOException {
-
-               Map<String, String> map = new HashMap<String, 
String>(System.getenv());
-               File tmpFolder = tmp.newFolder();
-               File fakeConf = new File(tmpFolder, "flink-conf.yaml");
-               fakeConf.createNewFile();
-               map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
-               TestBaseUtils.setEnv(map);
-               Options options = new Options();
-               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
-               cli.getYARNSessionCLIOptions(options);
-
-               CommandLineParser parser = new PosixParser();
-               CommandLine cmd = null;
-               try {
-                       cmd = parser.parse(options, new String[]{"run", "-j", 
"fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
-               } catch(Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Parsing failed with " + e.getMessage());
-               }
-
-               AbstractFlinkYarnClient flinkYarnClient = 
cli.createFlinkYarnClient(cmd);
-
-               Assert.assertNotNull(flinkYarnClient);
-
-               Map<String, String> dynProperties = 
CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
-               Assert.assertEquals(1, dynProperties.size());
-               Assert.assertEquals("5 min", 
dynProperties.get("akka.ask.timeout"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
deleted file mode 100644
index b0757f5..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.SignalHandler;
-
-/**
- * Yarn application master which starts the {@link TestingYarnJobManager},
- * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}.
- */
-public class TestingApplicationMaster extends YarnApplicationMasterRunner {
-
-       @Override
-       public Class<? extends JobManager> getJobManagerClass() {
-               return TestingYarnJobManager.class;
-       }
-
-       @Override
-       public Class<? extends MemoryArchivist> getArchivistClass() {
-               return TestingMemoryArchivist.class;
-       }
-
-       @Override
-       protected Class<? extends TaskManager> getTaskManagerClass() {
-               return TestingYarnTaskManager.class;
-       }
-
-       @Override
-       public Class<? extends YarnFlinkResourceManager> 
getResourceManagerClass() {
-               return TestingYarnFlinkResourceManager.class;
-       }
-
-       public static void main(String[] args) {
-               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / JobManager", args);
-               SignalHandler.register(LOG);
-
-               // run and exit with the proper return code
-               int returnCode = new TestingApplicationMaster().run(args);
-               System.exit(returnCode);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
deleted file mode 100644
index 1efc336..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import com.google.common.base.Preconditions;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Yarn client which starts a {@link TestingApplicationMaster}. Additionally 
the client adds the
- * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the 
set of files which
- * are shipped to the yarn cluster. This is necessary to load the testing 
classes.
- */
-public class TestingFlinkYarnClient extends FlinkYarnClientBase {
-
-       public TestingFlinkYarnClient() {
-               List<File> filesToShip = new ArrayList<>();
-
-               File testingJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn-tests"));
-               Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
-                       "Make sure to package the flink-yarn-tests module.");
-
-               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-runtime"));
-               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
-                       "jar. Make sure to package the flink-runtime module.");
-
-               filesToShip.add(testingJar);
-               filesToShip.add(testingRuntimeJar);
-
-               setShipFiles(filesToShip);
-       }
-
-       @Override
-       protected Class<?> getApplicationMasterClass() {
-               return TestingApplicationMaster.class;
-       }
-
-       public static class TestJarFinder implements FilenameFilter {
-
-               private final String jarName;
-
-               public TestJarFinder(final String jarName) {
-                       this.jarName = jarName;
-               }
-
-               @Override
-               public boolean accept(File dir, String name) {
-                       return name.startsWith(jarName) && 
name.endsWith("-tests.jar") &&
-                               dir.getAbsolutePath().contains(dir.separator + 
jarName + dir.separator);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
deleted file mode 100644
index 5a61b8f..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-/**
- * Flink's testing resource manager for Yarn.
- */
-public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
-
-       public TestingYarnFlinkResourceManager(
-               Configuration flinkConfig,
-               YarnConfiguration yarnConfig,
-               LeaderRetrievalService leaderRetrievalService,
-               String applicationMasterHostName,
-               String webInterfaceURL,
-               ContaineredTaskManagerParameters taskManagerParameters,
-               ContainerLaunchContext taskManagerLaunchContext,
-               int yarnHeartbeatIntervalMillis,
-               int maxFailedContainers,
-               int numInitialTaskManagers) {
-
-               super(
-                       flinkConfig,
-                       yarnConfig,
-                       leaderRetrievalService,
-                       applicationMasterHostName,
-                       webInterfaceURL,
-                       taskManagerParameters,
-                       taskManagerLaunchContext,
-                       yarnHeartbeatIntervalMillis,
-                       maxFailedContainers,
-                       numInitialTaskManagers);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
deleted file mode 100644
index 8586a77..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import java.io.IOException;
-
-/**
- * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
- */
-public class TestingYarnTaskManagerRunner {
-       public static void main(String[] args) throws IOException {
-               YarnTaskManagerRunner.runYarnTaskManager(args, 
TestingYarnTaskManager.class);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
deleted file mode 100644
index 784bf24..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.Level;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class UtilsTest {
-       private static final Logger LOG = 
LoggerFactory.getLogger(UtilsTest.class);
-
-       @Test
-       public void testUberjarLocator() {
-               File dir = YarnTestBase.findFile("..", new 
YarnTestBase.RootDirFilenameFilter());
-               Assert.assertNotNull(dir);
-               Assert.assertTrue(dir.getName().endsWith(".jar"));
-               dir = dir.getParentFile().getParentFile(); // from uberjar to 
lib to root
-               Assert.assertTrue(dir.exists());
-               Assert.assertTrue(dir.isDirectory());
-               List<String> files = Arrays.asList(dir.list());
-               Assert.assertTrue(files.contains("lib"));
-               Assert.assertTrue(files.contains("bin"));
-               Assert.assertTrue(files.contains("conf"));
-       }
-
-       /**
-        * Remove 15% of the heap, at least 384MB.
-        *
-        */
-       @Test
-       public void testHeapCutoff() {
-               Configuration conf = new Configuration();
-               conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
0.15);
-               conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 
384);
-
-               Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
-               Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) 
);
-
-               // test different configuration
-               Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
-
-               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 
"1000");
-               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"0.1");
-               Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
-
-               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"0.5");
-               Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
-
-               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"1");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-
-               // test also deprecated keys
-               conf = new Configuration();
-               conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
-               conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
-
-               Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
-               Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) 
);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void illegalArgument() {
-               Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"1.1");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void illegalArgumentNegative() {
-               Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"-0.01");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void tooMuchCutoff() {
-               Configuration conf = new Configuration();
-               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"6000");
-               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-       }
-
-       @Test
-       public void testGetEnvironmentVariables() {
-               Configuration testConf = new Configuration();
-               
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", 
"/usr/lib/native");
-
-               Map<String, String> res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
-
-               Assert.assertEquals(1, res.size());
-               Map.Entry<String, String> entry = 
res.entrySet().iterator().next();
-               Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
-               Assert.assertEquals("/usr/lib/native", entry.getValue());
-       }
-
-       @Test
-       public void testGetEnvironmentVariablesErroneous() {
-               Configuration testConf = new Configuration();
-               testConf.setString("yarn.application-master.env.", 
"/usr/lib/native");
-
-               Map<String, String> res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
-
-               Assert.assertEquals(0, res.size());
-       }
-
-       //
-       // --------------- Tools to test if a certain string has been logged 
with Log4j. -------------
-       // See :  
http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
-       //
-       private static TestAppender testAppender;
-       public static void addTestAppender(Class target, Level level) {
-               testAppender = new TestAppender();
-               testAppender.setThreshold(level);
-               org.apache.log4j.Logger lg = 
org.apache.log4j.Logger.getLogger(target);
-               lg.setLevel(level);
-               lg.addAppender(testAppender);
-               
//org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
-       }
-
-       public static void checkForLogString(String expected) {
-               LoggingEvent found = getEventContainingString(expected);
-               if(found != null) {
-                       LOG.info("Found expected string '"+expected+"' in log 
message "+found);
-                       return;
-               }
-               Assert.fail("Unable to find expected string '" + expected + "' 
in log messages");
-       }
-
-       public static LoggingEvent getEventContainingString(String expected) {
-               if(testAppender == null) {
-                       throw new NullPointerException("Initialize test 
appender first");
-               }
-               LoggingEvent found = null;
-               // make sure that different threads are not logging while the 
logs are checked
-               synchronized (testAppender.events) {
-                       for (LoggingEvent event : testAppender.events) {
-                               if 
(event.getMessage().toString().contains(expected)) {
-                                       found = event;
-                                       break;
-                               }
-                       }
-               }
-               return found;
-       }
-
-       public static class TestAppender extends AppenderSkeleton {
-               public final List<LoggingEvent> events = new ArrayList<>();
-               public void close() {}
-               public boolean requiresLayout() {return false;}
-               @Override
-               protected void append(LoggingEvent event) {
-                       synchronized (events){
-                               events.add(event);
-                       }
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
deleted file mode 100644
index a93abf0..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
-import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-public class YARNHighAvailabilityITCase extends YarnTestBase {
-
-       private static TestingServer zkServer;
-
-       private static ActorSystem actorSystem;
-
-       private static final int numberApplicationAttempts = 10;
-
-       @Rule
-       public TemporaryFolder tmp = new TemporaryFolder();
-
-       @BeforeClass
-       public static void setup() {
-               actorSystem = AkkaUtils.createDefaultActorSystem();
-
-               try {
-                       zkServer = new TestingServer();
-                       zkServer.start();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("Could not start ZooKeeper testing 
cluster.");
-               }
-
-               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
-               yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" 
+ numberApplicationAttempts);
-
-               startYARNWithConfig(yarnConfiguration);
-       }
-
-       @AfterClass
-       public static void teardown() throws IOException {
-               if(zkServer != null) {
-                       zkServer.stop();
-               }
-
-               JavaTestKit.shutdownActorSystem(actorSystem);
-               actorSystem = null;
-       }
-
-       /**
-        * Tests that the application master can be killed multiple times and 
that the surviving
-        * TaskManager succesfully reconnects to the newly started JobManager.
-        * @throws Exception
-        */
-       @Test
-       public void testMultipleAMKill() throws Exception {
-               final int numberKillingAttempts = numberApplicationAttempts - 1;
-
-               TestingFlinkYarnClient flinkYarnClient = new 
TestingFlinkYarnClient();
-
-               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
-               flinkYarnClient.setTaskManagerCount(1);
-               flinkYarnClient.setJobManagerMemory(768);
-               flinkYarnClient.setTaskManagerMemory(1024);
-               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-               String confDirPath = System.getenv("FLINK_CONF_DIR");
-               flinkYarnClient.setConfigurationDirectory(confDirPath);
-
-               String fsStateHandlePath = tmp.getRoot().getPath();
-
-               
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
-               
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
-                       zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
-                       "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
-                       "@@" + 
FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + 
fsStateHandlePath + "/checkpoints" +
-                       "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + 
fsStateHandlePath + "/recovery");
-               flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
-
-               AbstractFlinkYarnCluster yarnCluster = null;
-
-               final FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
-
-               try {
-                       yarnCluster = flinkYarnClient.deploy();
-                       yarnCluster.connectToCluster();
-                       final Configuration config = 
yarnCluster.getFlinkConfiguration();
-
-                       new JavaTestKit(actorSystem) {{
-                               for (int attempt = 0; attempt < 
numberKillingAttempts; attempt++) {
-                                       new Within(timeout) {
-                                               @Override
-                                               protected void run() {
-                                                       try {
-                                                               
LeaderRetrievalService lrs = 
LeaderRetrievalUtils.createLeaderRetrievalService(config);
-                                                               ActorGateway 
gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
-                                                               ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-                                                               
gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                               
expectMsgEquals(Messages.getAcknowledge());
-
-                                                               
gateway.tell(PoisonPill.getInstance());
-                                                       } catch (Exception e) {
-                                                               throw new 
AssertionError("Could not complete test.", e);
-                                                       }
-                                               }
-                                       };
-                               }
-
-                               new Within(timeout) {
-                                       @Override
-                                       protected void run() {
-                                               try {
-                                                       LeaderRetrievalService 
lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
-                                                       ActorGateway gateway2 = 
LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
-                                                       ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
-                                                       gateway2.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
-
-                                                       
expectMsgEquals(Messages.getAcknowledge());
-                                               } catch (Exception e) {
-                                                       throw new 
AssertionError("Could not complete test.", e);
-                                               }
-                                       }
-                               };
-
-                       }};
-               } finally {
-                       if (yarnCluster != null) {
-                               yarnCluster.shutdown(false);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
deleted file mode 100644
index 38e17a5..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.google.common.base.Joiner;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.NMTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.log4j.Level;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.flink.yarn.UtilsTest.addTestAppender;
-import static org.apache.flink.yarn.UtilsTest.checkForLogString;
-
-
-/**
- * This test starts a MiniYARNCluster with a CapacityScheduler.
- * Is has, by default a queue called "default". The configuration here adds 
another queue: "qa-team".
- */
-public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
-       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
-
-       @BeforeClass
-       public static void setup() {
-               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class, ResourceScheduler.class);
-               yarnConfiguration.set("yarn.scheduler.capacity.root.queues", 
"default,qa-team");
-               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
-               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
-               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-capacityscheduler");
-               startYARNWithConfig(yarnConfiguration);
-       }
-
-       /**
-        * Test regular operation, including command line parameter parsing.
-        */
-       @Test
-       public void testClientStartup() {
-               LOG.info("Starting testClientStartup()");
-               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
-                                               "-n", "1",
-                                               "-jm", "768",
-                                               "-tm", "1024", "-qu", 
"qa-team"},
-                               "Number of connected TaskManagers changed to 1. 
Slots available: 1", null, RunTypes.YARN_SESSION, 0);
-               LOG.info("Finished testClientStartup()");
-       }
-
-       /**
-        * Test per-job yarn cluster
-        *
-        * This also tests the prefixed CliFrontend options for the YARN case
-        * We also test if the requested parallelism of 2 is passed through.
-        * The parallelism is requested at the YARN client (-ys).
-        */
-       @Test
-       public void perJobYarnCluster() {
-               LOG.info("Starting perJobYarnCluster()");
-               addTestAppender(JobClient.class, Level.INFO);
-               File exampleJarLocation = YarnTestBase.findFile("..", new 
ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude 
streaming wordcount here.
-               Assert.assertNotNull("Could not find wordcount jar", 
exampleJarLocation);
-               runWithArgs(new String[]{"run", "-m", "yarn-cluster",
-                               "-yj", flinkUberjar.getAbsolutePath(), "-yt", 
flinkLibFolder.getAbsolutePath(),
-                               "-yn", "1",
-                               "-ys", "2", //test that the job is executed 
with a DOP of 2
-                               "-yjm", "768",
-                               "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()},
-                               /* test succeeded after this string */
-                       "Job execution complete",
-                               /* prohibited strings: (we want to see (2/2)) */
-                       new String[]{"System.out)(1/1) switched to FINISHED "},
-                       RunTypes.CLI_FRONTEND, 0, true);
-               LOG.info("Finished perJobYarnCluster()");
-       }
-
-
-       /**
-        * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
-        */
-       @Test(timeout=100000) // timeout after 100 seconds
-       public void testTaskManagerFailure() {
-               LOG.info("Starting testTaskManagerFailure()");
-               Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
-                               "-n", "1",
-                               "-jm", "768",
-                               "-tm", "1024",
-                               "-s", "3", // set the slots 3 to check if the 
vCores are set properly!
-                               "-nm", "customName",
-                               "-Dfancy-configuration-value=veryFancy",
-                               "-Dyarn.maximum-failed-containers=3",
-                               "-D" + ConfigConstants.YARN_VCORES + "=2"},
-                       "Number of connected TaskManagers changed to 1. Slots 
available: 3",
-                       RunTypes.YARN_SESSION);
-
-               Assert.assertEquals(2, getRunningContainers());
-
-               // ------------------------ Test if JobManager web interface is 
accessible -------
-
-               YarnClient yc = null;
-               try {
-                       yc = YarnClient.createYarnClient();
-                       yc.init(yarnConfiguration);
-                       yc.start();
-
-                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-                       Assert.assertEquals(1, apps.size()); // Only one running
-                       ApplicationReport app = apps.get(0);
-                       Assert.assertEquals("customName", app.getName());
-                       String url = app.getTrackingUrl();
-                       if(!url.endsWith("/")) {
-                               url += "/";
-                       }
-                       if(!url.startsWith("http://";)) {
-                               url = "http://"; + url;
-                       }
-                       LOG.info("Got application URL from YARN {}", url);
-
-                       String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
-
-                       JsonNode parsedTMs = new 
ObjectMapper().readTree(response);
-                       ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
-                       Assert.assertNotNull(taskManagers);
-                       Assert.assertEquals(1, taskManagers.size());
-                       Assert.assertEquals(3, 
taskManagers.get(0).get("slotsNumber").asInt());
-
-                       // get the configuration from webinterface & check if 
the dynamic properties from YARN show up there.
-                       String jsonConfig = TestBaseUtils.getFromHTTP(url + 
"jobmanager/config");
-                       Map<String, String> parsedConfig = 
WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
-
-                       Assert.assertEquals("veryFancy", 
parsedConfig.get("fancy-configuration-value"));
-                       Assert.assertEquals("3", 
parsedConfig.get("yarn.maximum-failed-containers"));
-                       Assert.assertEquals("2", 
parsedConfig.get(ConfigConstants.YARN_VCORES));
-
-                       // -------------- FLINK-1902: check if jobmanager 
hostname/port are shown in web interface
-                       // first, get the hostname/port
-                       String oC = outContent.toString();
-                       Pattern p = Pattern.compile("Flink JobManager is now 
running on ([a-zA-Z0-9.-]+):([0-9]+)");
-                       Matcher matches = p.matcher(oC);
-                       String hostname = null;
-                       String port = null;
-                       while(matches.find()) {
-                               hostname = matches.group(1).toLowerCase();
-                               port = matches.group(2);
-                       }
-                       LOG.info("Extracted hostname:port: {} {}", hostname, 
port);
-
-                       Assert.assertEquals("unable to find hostname in " + 
jsonConfig, hostname,
-                               
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
-                       Assert.assertEquals("unable to find port in " + 
jsonConfig, port,
-                               
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
-
-                       // test logfile access
-                       String logs = TestBaseUtils.getFromHTTP(url + 
"jobmanager/log");
-                       Assert.assertTrue(logs.contains("Starting YARN 
ApplicationMaster"));
-                       Assert.assertTrue(logs.contains("Starting JobManager"));
-                       Assert.assertTrue(logs.contains("Starting JobManager 
Web Frontend"));
-               } catch(Throwable e) {
-                       LOG.warn("Error while running test",e);
-                       Assert.fail(e.getMessage());
-               }
-
-               // ------------------------ Kill container with TaskManager and 
check if vcores are set correctly -------
-
-               // find container id of taskManager:
-               ContainerId taskManagerContainer = null;
-               NodeManager nodeManager = null;
-               UserGroupInformation remoteUgi = null;
-               NMTokenIdentifier nmIdent = null;
-               try {
-                       remoteUgi = UserGroupInformation.getCurrentUser();
-               } catch (IOException e) {
-                       LOG.warn("Unable to get curr user", e);
-                       Assert.fail();
-               }
-               for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-                       NodeManager nm = yarnCluster.getNodeManager(nmId);
-                       ConcurrentMap<ContainerId, Container> containers = 
nm.getNMContext().getContainers();
-                       for(Map.Entry<ContainerId, Container> entry : 
containers.entrySet()) {
-                               String command = Joiner.on(" 
").join(entry.getValue().getLaunchContext().getCommands());
-                               
if(command.contains(YarnTaskManager.class.getSimpleName())) {
-                                       taskManagerContainer = entry.getKey();
-                                       nodeManager = nm;
-                                       nmIdent = new 
NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
-                                       // allow myself to do stuff with the 
container
-                                       // 
remoteUgi.addCredentials(entry.getValue().getCredentials());
-                                       remoteUgi.addTokenIdentifier(nmIdent);
-                               }
-                       }
-                       sleep(500);
-               }
-
-               Assert.assertNotNull("Unable to find container with 
TaskManager", taskManagerContainer);
-               Assert.assertNotNull("Illegal state", nodeManager);
-
-               try {
-                       List<NodeReport> nodeReports = 
yc.getNodeReports(NodeState.RUNNING);
-
-                       // we asked for one node with 2 vcores so we expect 2 
vcores
-                       int userVcores = 0;
-                       for (NodeReport rep: nodeReports) {
-                               userVcores += rep.getUsed().getVirtualCores();
-                       }
-                       Assert.assertEquals(2, userVcores);
-               } catch (Exception e) {
-                       Assert.fail("Test failed: " + e.getMessage());
-               }
-
-               yc.stop();
-
-               List<ContainerId> toStop = new LinkedList<ContainerId>();
-               toStop.add(taskManagerContainer);
-               StopContainersRequest scr = 
StopContainersRequest.newInstance(toStop);
-
-               try {
-                       
nodeManager.getNMContext().getContainerManager().stopContainers(scr);
-               } catch (Throwable e) {
-                       LOG.warn("Error stopping container", e);
-                       Assert.fail("Error stopping container: 
"+e.getMessage());
-               }
-
-               // stateful termination check:
-               // wait until we saw a container being killed and AFTERWARDS a 
new one launched
-               boolean ok = false;
-               do {
-                       LOG.debug("Waiting for correct order of events. Output: 
{}", errContent.toString());
-
-                       String o = errContent.toString();
-                       int killedOff = o.indexOf("Container killed by the 
ApplicationMaster");
-                       if (killedOff != -1) {
-                               o = o.substring(killedOff);
-                               ok = o.indexOf("Launching TaskManager") > 0;
-                       }
-                       sleep(1000);
-               } while(!ok);
-
-
-               // send "stop" command to command line interface
-               runner.sendStop();
-               // wait for the thread to stop
-               try {
-                       runner.join(1000);
-               } catch (InterruptedException e) {
-                       LOG.warn("Interrupted while stopping runner", e);
-               }
-               LOG.warn("stopped");
-
-               // ----------- Send output to logger
-               System.setOut(originalStdout);
-               System.setErr(originalStderr);
-               String oC = outContent.toString();
-               String eC = errContent.toString();
-               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
oC);
-               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
eC);
-
-               // ------ Check if everything happened correctly
-               Assert.assertTrue("Expect to see failed container",
-                       eC.contains("New messages from the YARN cluster"));
-
-               Assert.assertTrue("Expect to see failed container",
-                       eC.contains("Container killed by the 
ApplicationMaster"));
-
-               Assert.assertTrue("Expect to see new container started",
-                       eC.contains("Launching TaskManager") && eC.contains("on 
host"));
-
-               // cleanup auth for the subsequent tests.
-               remoteUgi.getTokenIdentifiers().remove(nmIdent);
-
-               LOG.info("Finished testTaskManagerFailure()");
-       }
-
-       /**
-        * Test deployment to non-existing queue. (user-reported error)
-        * Deployment to the queue is possible because there are no queues, so 
we don't check.
-        */
-       @Test
-       public void testNonexistingQueue() {
-               LOG.info("Starting testNonexistingQueue()");
-               addTestAppender(FlinkYarnClient.class, Level.WARN);
-               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-                               "-t", flinkLibFolder.getAbsolutePath(),
-                               "-n", "1",
-                               "-jm", "768",
-                               "-tm", "1024",
-                               "-qu", "doesntExist"}, "to unknown queue: 
doesntExist", null, RunTypes.YARN_SESSION, 1);
-               checkForLogString("The specified queue 'doesntExist' does not 
exist. Available queues: default, qa-team");
-               LOG.info("Finished testNonexistingQueue()");
-       }
-
-       /**
-        * Test per-job yarn cluster with the parallelism set at the 
CliFrontend instead of the YARN client.
-        */
-       @Test
-       public void perJobYarnClusterWithParallelism() {
-               LOG.info("Starting perJobYarnClusterWithParallelism()");
-               // write log messages to stdout as well, so that the 
runWithArgs() method
-               // is catching the log output
-               addTestAppender(JobClient.class, Level.INFO);
-               File exampleJarLocation = YarnTestBase.findFile("..", new 
ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude 
streaming wordcount here.
-               Assert.assertNotNull("Could not find wordcount jar", 
exampleJarLocation);
-               runWithArgs(new String[]{"run",
-                               "-p", "2", //test that the job is executed with 
a DOP of 2
-                               "-m", "yarn-cluster",
-                               "-yj", flinkUberjar.getAbsolutePath(),
-                               "-yt", flinkLibFolder.getAbsolutePath(),
-                               "-yn", "1",
-                               "-yjm", "768",
-                               "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()},
-                               /* test succeeded after this string */
-                       "Job execution complete",
-                               /* prohibited strings: (we want to see (2/2)) */
-                       new String[]{"System.out)(1/1) switched to FINISHED "},
-                       RunTypes.CLI_FRONTEND, 0, true);
-               LOG.info("Finished perJobYarnClusterWithParallelism()");
-       }
-
-       /**
-        * Test a fire-and-forget job submission to a YARN cluster.
-        */
-       @Test(timeout=60000)
-       public void testDetachedPerJobYarnCluster() {
-               LOG.info("Starting testDetachedPerJobYarnCluster()");
-
-               File exampleJarLocation = YarnTestBase.findFile(
-                       ".." + File.separator + "flink-examples" + 
File.separator + "flink-examples-batch",
-                       new ContainsName(new String[] {"-WordCount.jar"}));
-
-               Assert.assertNotNull("Could not find batch wordcount jar", 
exampleJarLocation);
-
-               
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
-
-               LOG.info("Finished testDetachedPerJobYarnCluster()");
-       }
-
-       /**
-        * Test a fire-and-forget job submission to a YARN cluster.
-        */
-       @Test(timeout=60000)
-       public void testDetachedPerJobYarnClusterWithStreamingJob() {
-               LOG.info("Starting 
testDetachedPerJobYarnClusterWithStreamingJob()");
-
-               File exampleJarLocation = YarnTestBase.findFile(
-                       ".." + File.separator + "flink-examples" + 
File.separator + "flink-examples-streaming",
-                       new ContainsName(new String[] {"-WordCount.jar"}));
-               Assert.assertNotNull("Could not find streaming wordcount jar", 
exampleJarLocation);
-
-               
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
-
-               LOG.info("Finished 
testDetachedPerJobYarnClusterWithStreamingJob()");
-       }
-
-       private void testDetachedPerJobYarnClusterInternal(String job) {
-               YarnClient yc = YarnClient.createYarnClient();
-               yc.init(yarnConfiguration);
-               yc.start();
-
-               // get temporary folder for writing output of wordcount example
-               File tmpOutFolder = null;
-               try{
-                       tmpOutFolder = tmp.newFolder();
-               }
-               catch(IOException e) {
-                       throw new RuntimeException(e);
-               }
-
-               // get temporary file for reading input data for wordcount 
example
-               File tmpInFile;
-               try{
-                       tmpInFile = tmp.newFile();
-                       FileUtils.writeStringToFile(tmpInFile, 
WordCountData.TEXT);
-               }
-               catch(IOException e) {
-                       throw new RuntimeException(e);
-               }
-
-               Runner runner = startWithArgs(new String[]{"run", "-m", 
"yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
-                               "-yt", flinkLibFolder.getAbsolutePath(),
-                               "-yn", "1",
-                               "-yjm", "768",
-                               "-yD", "yarn.heap-cutoff-ratio=0.5", // test if 
the cutoff is passed correctly
-                               "-ytm", "1024",
-                               "-ys", "2", // test requesting slots from YARN.
-                               "--yarndetached", job, "--input", 
tmpInFile.getAbsoluteFile().toString(), "--output", 
tmpOutFolder.getAbsoluteFile().toString()},
-                       "Job has been submitted with JobID",
-                       RunTypes.CLI_FRONTEND);
-
-               // it should usually be 2, but on slow machines, the number 
varies
-               Assert.assertTrue("There should be at most 2 containers 
running", getRunningContainers() <= 2);
-               // give the runner some time to detach
-               for (int attempt = 0; runner.isAlive() && attempt < 5; 
attempt++) {
-                       try {
-                               Thread.sleep(500);
-                       } catch (InterruptedException e) {
-                       }
-               }
-               Assert.assertFalse("The runner should detach.", 
runner.isAlive());
-               LOG.info("CLI Frontend has returned, so the job is running");
-
-               // find out the application id and wait until it has finished.
-               try {
-                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-
-                       ApplicationId tmpAppId;
-                       if (apps.size() == 1) {
-                               // Better method to find the right appId. But 
sometimes the app is shutting down very fast
-                               // Only one running
-                               tmpAppId = apps.get(0).getApplicationId();
-
-                               LOG.info("waiting for the job with appId {} to 
finish", tmpAppId);
-                               // wait until the app has finished
-                               
while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
-                                       sleep(500);
-                               }
-                       } else {
-                               // get appId by finding the latest finished 
appid
-                               apps = yc.getApplications();
-                               Collections.sort(apps, new 
Comparator<ApplicationReport>() {
-                                       @Override
-                                       public int compare(ApplicationReport 
o1, ApplicationReport o2) {
-                                               return 
o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
-                                       }
-                               });
-                               tmpAppId = apps.get(0).getApplicationId();
-                               LOG.info("Selected {} as the last appId from 
{}", tmpAppId, Arrays.toString(apps.toArray()));
-                       }
-                       final ApplicationId id = tmpAppId;
-
-                       // now it has finished.
-                       // check the output files.
-                       File[] listOfOutputFiles = tmpOutFolder.listFiles();
-
-
-                       Assert.assertNotNull("Taskmanager output not found", 
listOfOutputFiles);
-                       LOG.info("The job has finished. TaskManager output 
files found in {}", tmpOutFolder );
-
-                       // read all output files in output folder to one output 
string
-                       String content = "";
-                       for(File f:listOfOutputFiles)
-                       {
-                               if(f.isFile())
-                               {
-                                       content += 
FileUtils.readFileToString(f) + "\n";
-                               }
-                       }
-                       //String content = 
FileUtils.readFileToString(taskmanagerOut);
-                       // check for some of the wordcount outputs.
-                       Assert.assertTrue("Expected string 'da 5' or '(all,2)' 
not found in string '"+content+"'", content.contains("da 5") || 
content.contains("(da,5)") || content.contains("(all,2)"));
-                       Assert.assertTrue("Expected string 'der 29' or 
'(mind,1)' not found in string'"+content+"'",content.contains("der 29") || 
content.contains("(der,29)") || content.contains("(mind,1)"));
-
-                       // check if the heap size for the TaskManager was set 
correctly
-                       File jobmanagerLog = YarnTestBase.findFile("..", new 
FilenameFilter() {
-                               @Override
-                               public boolean accept(File dir, String name) {
-                                       return name.contains("jobmanager.log") 
&& dir.getAbsolutePath().contains(id.toString());
-                               }
-                       });
-                       Assert.assertNotNull("Unable to locate JobManager log", 
jobmanagerLog);
-                       content = FileUtils.readFileToString(jobmanagerLog);
-                       // TM was started with 1024 but we cut off 50% (NOT THE 
DEFAULT VALUE)
-                       String expected = "Starting TaskManagers with command: 
$JAVA_HOME/bin/java -Xms424m -Xmx424m";
-                       Assert.assertTrue("Expected string '" + expected + "' 
not found in JobManager log: '"+jobmanagerLog+"'",
-                               content.contains(expected));
-                       expected = " (2/2) (attempt #0) to ";
-                       Assert.assertTrue("Expected string '" + expected + "' 
not found in JobManager log." +
-                                       "This string checks that the job has 
been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
-                               content.contains(expected));
-
-                       // make sure the detached app is really finished.
-                       LOG.info("Checking again that app has finished");
-                       ApplicationReport rep;
-                       do {
-                               sleep(500);
-                               rep = yc.getApplicationReport(id);
-                               LOG.info("Got report {}", rep);
-                       } while(rep.getYarnApplicationState() == 
YarnApplicationState.RUNNING);
-
-               } catch(Throwable t) {
-                       LOG.warn("Error while detached yarn session was 
running", t);
-                       Assert.fail(t.getMessage());
-               }
-       }
-
-       @After
-       public void checkForProhibitedLogContents() {
-               ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, 
WHITELISTED_STRINGS);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
deleted file mode 100644
index cb402a3..0000000
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.configuration.GlobalConfiguration;
-import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-
-import org.apache.log4j.Level;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.List;
-
-import static org.apache.flink.yarn.UtilsTest.addTestAppender;
-import static org.apache.flink.yarn.UtilsTest.checkForLogString;
-
-
-/**
- * This test starts a MiniYARNCluster with a FIFO scheduler.
- * There are no queues for that scheduler.
- */
-public class YARNSessionFIFOITCase extends YarnTestBase {
-       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOITCase.class);
-
-       /*
-       Override init with FIFO scheduler.
-        */
-       @BeforeClass
-       public static void setup() {
-               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
FifoScheduler.class, ResourceScheduler.class);
-               yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
-               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
-               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-fifo");
-               startYARNWithConfig(yarnConfiguration);
-       }
-
-       @After
-       public void checkForProhibitedLogContents() {
-               ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, 
WHITELISTED_STRINGS);
-       }
-
-       /**
-        * Test regular operation, including command line parameter parsing.
-        */
-       @Test(timeout=60000) // timeout after a minute.
-       public void testDetachedMode() {
-               LOG.info("Starting testDetachedMode()");
-               addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
-               Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(),
-                                               "-t", 
flinkLibFolder.getAbsolutePath(),
-                                               "-n", "1",
-                                               "-jm", "768",
-                                               "-tm", "1024",
-                                               "--name", "MyCustomName", // 
test setting a custom name
-                                               "--detached"},
-                               "Flink JobManager is now running on", 
RunTypes.YARN_SESSION);
-
-               checkForLogString("The Flink YARN client has been started in 
detached mode");
-
-               Assert.assertFalse("The runner should detach.", 
runner.isAlive());
-
-               LOG.info("Waiting until two containers are running");
-               // wait until two containers are running
-               while(getRunningContainers() < 2) {
-                       sleep(500);
-               }
-               LOG.info("Two containers are running. Killing the application");
-
-               // kill application "externally".
-               try {
-                       YarnClient yc = YarnClient.createYarnClient();
-                       yc.init(yarnConfiguration);
-                       yc.start();
-                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
-                       Assert.assertEquals(1, apps.size()); // Only one running
-                       ApplicationReport app = apps.get(0);
-
-                       Assert.assertEquals("MyCustomName", app.getName());
-                       ApplicationId id = app.getApplicationId();
-                       yc.killApplication(id);
-
-                       
while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
-                               sleep(500);
-                       }
-               } catch(Throwable t) {
-                       LOG.warn("Killing failed", t);
-                       Assert.fail();
-               }
-
-               LOG.info("Finished testDetachedMode()");
-       }
-
-       /**
-        * Test querying the YARN cluster.
-        *
-        * This test validates through 666*2 cores in the "cluster".
-        */
-       @Test
-       public void testQueryCluster() {
-               LOG.info("Starting testQueryCluster()");
-               runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 
totalCores 1332",null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
-               LOG.info("Finished testQueryCluster()");
-       }
-
-       /**
-        * Test deployment to non-existing queue. (user-reported error)
-        * Deployment to the queue is possible because there are no queues, so 
we don't check.
-        */
-       @Test
-       public void testNonexistingQueue() {
-               LOG.info("Starting testNonexistingQueue()");
-               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-                               "-t", flinkLibFolder.getAbsolutePath(),
-                               "-n", "1",
-                               "-jm", "768",
-                               "-tm", "1024",
-                               "-qu", "doesntExist"}, "Number of connected 
TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
-               LOG.info("Finished testNonexistingQueue()");
-       }
-
-       /**
-        * The test cluster has the following resources:
-        * - 2 Nodes with 4096 MB each.
-        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-        *
-        * We allocate:
-        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
-        * 5 TaskManagers with 1585 MB
-        *
-        * user sees a total request of: 8181 MB (fits)
-        * system sees a total request of: 8437 (doesn't fit due to min alloc 
mb)
-        */
-       @Ignore("The test is too resource consuming (8.5 GB of memory)")
-       @Test
-       public void testResourceComputation() {
-               addTestAppender(FlinkYarnClient.class, Level.WARN);
-               LOG.info("Starting testResourceComputation()");
-               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
-                               "-n", "5",
-                               "-jm", "256",
-                               "-tm", "1585"}, "Number of connected 
TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
-               LOG.info("Finished testResourceComputation()");
-               checkForLogString("This YARN session requires 8437MB of memory 
in the cluster. There are currently only 8192MB available.");
-       }
-
-       /**
-        * The test cluster has the following resources:
-        * - 2 Nodes with 4096 MB each.
-        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
-        *
-        * We allocate:
-        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
-        * 2 TaskManagers with 3840 MB
-        *
-        * the user sees a total request of: 7936 MB (fits)
-        * the system sees a request of: 8192 MB (fits)
-        * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which 
doesn't fit.
-        *
-        * --> check if the system properly rejects allocating this session.
-        */
-       @Ignore("The test is too resource consuming (8 GB of memory)")
-       @Test
-       public void testfullAlloc() {
-               addTestAppender(FlinkYarnClient.class, Level.WARN);
-               LOG.info("Starting testfullAlloc()");
-               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
-                               "-n", "2",
-                               "-jm", "256",
-                               "-tm", "3840"}, "Number of connected 
TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
-               LOG.info("Finished testfullAlloc()");
-               checkForLogString("There is not enough memory available in the 
YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: 
[4096, 4096]\n" +
-                               "After allocating the JobManager (512MB) and 
(1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
-       }
-
-       /**
-        * Test the YARN Java API
-        */
-       @Test
-       public void testJavaAPI() {
-               final int WAIT_TIME = 15;
-               LOG.info("Starting testJavaAPI()");
-
-               AbstractFlinkYarnClient flinkYarnClient = 
FlinkYarnSessionCli.getFlinkYarnClient();
-               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
-               flinkYarnClient.setTaskManagerCount(1);
-               flinkYarnClient.setJobManagerMemory(768);
-               flinkYarnClient.setTaskManagerMemory(1024);
-               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
-               
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-               String confDirPath = System.getenv("FLINK_CONF_DIR");
-               flinkYarnClient.setConfigurationDirectory(confDirPath);
-               
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
-               flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
-
-               // deploy
-               AbstractFlinkYarnCluster yarnCluster = null;
-               try {
-                       yarnCluster = flinkYarnClient.deploy();
-                       yarnCluster.connectToCluster();
-               } catch (Exception e) {
-                       LOG.warn("Failing test", e);
-                       Assert.fail("Error while deploying YARN cluster: 
"+e.getMessage());
-               }
-               GetClusterStatusResponse expectedStatus = new 
GetClusterStatusResponse(1, 1);
-               for(int second = 0; second < WAIT_TIME * 2; second++) { // run 
"forever"
-                       try {
-                               Thread.sleep(1000);
-                       } catch (InterruptedException e) {
-                               LOG.warn("Interrupted", e);
-                       }
-                       GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
-                       if(status != null && status.equals(expectedStatus)) {
-                               LOG.info("Cluster reached status " + status);
-                               break; // all good, cluster started
-                       }
-                       if(second > WAIT_TIME) {
-                               // we waited for 15 seconds. cluster didn't 
come up correctly
-                               Assert.fail("The custer didn't start after " + 
WAIT_TIME + " seconds");
-                       }
-               }
-
-               // use the cluster
-               Assert.assertNotNull(yarnCluster.getJobManagerAddress());
-               Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
-
-               LOG.info("Shutting down cluster. All tests passed");
-               // shutdown cluster
-               yarnCluster.shutdown(false);
-               LOG.info("Finished testJavaAPI()");
-       }
-}

Reply via email to