http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
deleted file mode 100644
index fc1e5bc..0000000
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileWriter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Scanner;
-import java.util.concurrent.ConcurrentMap;
-
-
-/**
- * This base class allows to use the MiniYARNCluster.
- * The cluster is re-used for all tests.
- *
- * This class is located in a different package which is build after 
flink-dist. This way,
- * we can use the YARN uberjar of flink to start a Flink YARN session.
- *
- * The test is not thread-safe. Parallel execution of tests is not possible!
- */
-public abstract class YarnTestBase extends TestLogger {
-       private static final Logger LOG = 
LoggerFactory.getLogger(YarnTestBase.class);
-
-       protected final static PrintStream originalStdout = System.out;
-       protected final static PrintStream originalStderr = System.err;
-
-       protected static String TEST_CLUSTER_NAME_KEY = 
"flink-yarn-minicluster-name";
-
-       protected final static int NUM_NODEMANAGERS = 2;
-
-       /** The tests are scanning for these strings in the final output. */
-       protected final static String[] PROHIBITED_STRINGS = {
-                       "Exception", // we don't want any exceptions to happen
-                       "Started [email protected]:8081" // Jetty 
should start on a random port in YARN mode.
-       };
-
-       /** These strings are white-listed, overriding teh prohibited strings */
-       protected final static String[] WHITELISTED_STRINGS = {
-                       "akka.remote.RemoteTransportExceptionNoStackTrace",
-                       // workaround for annoying InterruptedException logging:
-                   // https://issues.apache.org/jira/browse/YARN-1022
-                       "java.lang.InterruptedException"
-       };
-
-       // Temp directory which is deleted after the unit test.
-       @ClassRule
-       public static TemporaryFolder tmp = new TemporaryFolder();
-
-       protected static MiniYARNCluster yarnCluster = null;
-
-       /**
-        * Uberjar (fat jar) file of Flink
-        */
-       protected static File flinkUberjar;
-
-       protected static final Configuration yarnConfiguration;
-
-       /**
-        * lib/ folder of the flink distribution.
-        */
-       protected static File flinkLibFolder;
-
-       static {
-               yarnConfiguration = new YarnConfiguration();
-               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
-               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096); // 4096 is the available memory anyways
-               
yarnConfiguration.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, 
true);
-               
yarnConfiguration.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
 true);
-               yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
-               
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
-               
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
 4);
-               
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
-               
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
-               yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
-               // so we have to change the number of cores for testing.
-               
yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 
20 seconds expiry (to ensure we properly heartbeat with YARN).
-       }
-
-
-
-       /**
-        * Sleep a bit between the tests (we are re-using the YARN cluster for 
the tests)
-        */
-       @After
-       public void sleep() {
-               try {
-                       Thread.sleep(500);
-               } catch (InterruptedException e) {
-                       Assert.fail("Should not happen");
-               }
-       }
-
-       private YarnClient yarnClient = null;
-       @Before
-       public void checkClusterEmpty() throws IOException, YarnException {
-               if(yarnClient == null) {
-                       yarnClient = YarnClient.createYarnClient();
-                       yarnClient.init(yarnConfiguration);
-                       yarnClient.start();
-               }
-
-               List<ApplicationReport> apps = yarnClient.getApplications();
-               for(ApplicationReport app : apps) {
-                       if(app.getYarnApplicationState() != 
YarnApplicationState.FINISHED
-                                       && app.getYarnApplicationState() != 
YarnApplicationState.KILLED
-                                       && app.getYarnApplicationState() != 
YarnApplicationState.FAILED) {
-                               Assert.fail("There is at least one application 
on the cluster is not finished." +
-                                               "App "+app.getApplicationId()+" 
is in state "+app.getYarnApplicationState());
-                       }
-               }
-       }
-
-       /**
-        * Locate a file or directory
-        */
-       public static File findFile(String startAt, FilenameFilter fnf) {
-               File root = new File(startAt);
-               String[] files = root.list();
-               if(files == null) {
-                       return null;
-               }
-               for(String file : files) {
-                       File f = new File(startAt + File.separator + file);
-                       if(f.isDirectory()) {
-                               File r = findFile(f.getAbsolutePath(), fnf);
-                               if(r != null) {
-                                       return r;
-                               }
-                       } else if (fnf.accept(f.getParentFile(), f.getName())) {
-                               return f;
-                       }
-               }
-               return null;
-       }
-
-       /**
-        * Filter to find root dir of the flink-yarn dist.
-        */
-       public static class RootDirFilenameFilter implements FilenameFilter {
-               @Override
-               public boolean accept(File dir, String name) {
-                       return name.startsWith("flink-dist") && 
name.endsWith(".jar") && dir.toString().contains("/lib");
-               }
-       }
-
-       public static class ContainsName implements FilenameFilter {
-               private String[] names;
-               private String excludeInPath = null;
-
-               /**
-                * @param names which have to be included in the filename.
-                */
-               public ContainsName(String[] names) {
-                       this.names = names;
-               }
-
-               public ContainsName(String[] names, String excludeInPath) {
-                       this.names = names;
-                       this.excludeInPath = excludeInPath;
-               }
-
-               @Override
-               public boolean accept(File dir, String name) {
-                       if(excludeInPath == null) {
-                               for(String n: names) {
-                                       if(!name.contains(n)) {
-                                               return false;
-                                       }
-                               }
-                               return true;
-                       } else {
-                               for(String n: names) {
-                                       if(!name.contains(n)) {
-                                               return false;
-                                       }
-                               }
-                               return !dir.toString().contains(excludeInPath);
-                       }
-               }
-       }
-
-       public static File writeYarnSiteConfigXML(Configuration yarnConf) 
throws IOException {
-               tmp.create();
-               File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + 
"/yarn-site.xml");
-
-               FileWriter writer = new FileWriter(yarnSiteXML);
-               yarnConf.writeXml(writer);
-               writer.flush();
-               writer.close();
-               return yarnSiteXML;
-       }
-
-       /**
-        * This method checks the written TaskManager and JobManager log files
-        * for exceptions.
-        *
-        * WARN: Please make sure the tool doesn't find old logfiles from 
previous test runs.
-        * So always run "mvn clean" before running the tests here.
-        *
-        */
-       public static void ensureNoProhibitedStringInLogFiles(final String[] 
prohibited, final String[] whitelisted) {
-               File cwd = new File("target/" + 
yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
-               Assert.assertTrue("Expecting directory " + 
cwd.getAbsolutePath() + " to exist", cwd.exists());
-               Assert.assertTrue("Expecting directory " + 
cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
-               
-               File foundFile = findFile(cwd.getAbsolutePath(), new 
FilenameFilter() {
-                       @Override
-                       public boolean accept(File dir, String name) {
-                       // scan each file for prohibited strings.
-                       File f = new File(dir.getAbsolutePath()+ "/" + name);
-                       try {
-                               Scanner scanner = new Scanner(f);
-                               while (scanner.hasNextLine()) {
-                                       final String lineFromFile = 
scanner.nextLine();
-                                       for (String aProhibited : prohibited) {
-                                               if 
(lineFromFile.contains(aProhibited)) {
-                                                       
-                                                       boolean 
whitelistedFound = false;
-                                                       for (String white : 
whitelisted) {
-                                                               if 
(lineFromFile.contains(white)) {
-                                                                       
whitelistedFound = true;
-                                                                       break;
-                                                               }
-                                                       }
-                                                       
-                                                       if (!whitelistedFound) {
-                                                               // logging in 
FATAL to see the actual message in TRAVIS tests.
-                                                               Marker fatal = 
MarkerFactory.getMarker("FATAL");
-                                                               
LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, 
lineFromFile);
-                                                               return true;
-                                                       }
-                                               }
-                                       }
-
-                               }
-                       } catch (FileNotFoundException e) {
-                               LOG.warn("Unable to locate file: 
"+e.getMessage()+" file: "+f.getAbsolutePath());
-                       }
-
-                       return false;
-                       }
-               });
-               if(foundFile != null) {
-                       Scanner scanner =  null;
-                       try {
-                               scanner = new Scanner(foundFile);
-                       } catch (FileNotFoundException e) {
-                               Assert.fail("Unable to locate file: 
"+e.getMessage()+" file: "+foundFile.getAbsolutePath());
-                       }
-                       LOG.warn("Found a file with a prohibited string. 
Printing contents:");
-                       while (scanner.hasNextLine()) {
-                               LOG.warn("LINE: "+scanner.nextLine());
-                       }
-                       Assert.fail("Found a file "+foundFile+" with a 
prohibited string: "+Arrays.toString(prohibited));
-               }
-       }
-
-       public static void sleep(int time) {
-               try {
-                       Thread.sleep(time);
-               } catch (InterruptedException e) {
-                       LOG.warn("Interruped",e);
-               }
-       }
-
-       public static int getRunningContainers() {
-               int count = 0;
-               for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-                       NodeManager nm = yarnCluster.getNodeManager(nmId);
-                       ConcurrentMap<ContainerId, Container> containers = 
nm.getNMContext().getContainers();
-                       count += containers.size();
-               }
-               return count;
-       }
-
-       public static void startYARNWithConfig(Configuration conf) {
-               // set the home directory to a tmp directory. Flink on YARN is 
using the home dir to distribute the file
-               File homeDir = null;
-               try {
-                       homeDir = tmp.newFolder();
-               } catch (IOException e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-               System.setProperty("user.home", homeDir.getAbsolutePath());
-               String uberjarStartLoc = "..";
-               LOG.info("Trying to locate uberjar in {}", new 
File(uberjarStartLoc));
-               flinkUberjar = findFile(uberjarStartLoc, new 
RootDirFilenameFilter());
-               Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
-               String flinkDistRootDir = 
flinkUberjar.getParentFile().getParent();
-               flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar 
is located in lib/
-               Assert.assertNotNull("Flink flinkLibFolder not found", 
flinkLibFolder);
-               Assert.assertTrue("lib folder not found", 
flinkLibFolder.exists());
-               Assert.assertTrue("lib folder not found", 
flinkLibFolder.isDirectory());
-
-               if (!flinkUberjar.exists()) {
-                       Assert.fail("Unable to locate yarn-uberjar.jar");
-               }
-
-               try {
-                       LOG.info("Starting up MiniYARNCluster");
-                       if (yarnCluster == null) {
-                               yarnCluster = new 
MiniYARNCluster(conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY), NUM_NODEMANAGERS, 
1, 1);
-
-                               yarnCluster.init(conf);
-                               yarnCluster.start();
-                       }
-
-                       Map<String, String> map = new HashMap<String, 
String>(System.getenv());
-
-                       File flinkConfDirPath = findFile(flinkDistRootDir, new 
ContainsName(new String[]{"flink-conf.yaml"}));
-                       Assert.assertNotNull(flinkConfDirPath);
-
-                       map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent());
-
-                       File yarnConfFile = writeYarnSiteConfigXML(conf);
-                       map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
-                       map.put("IN_TESTS", "yes we are in tests"); // see 
FlinkYarnClient() for more infos
-                       TestBaseUtils.setEnv(map);
-
-                       Assert.assertTrue(yarnCluster.getServiceState() == 
Service.STATE.STARTED);
-
-                       // wait for the nodeManagers to connect
-                       while(!yarnCluster.waitForNodeManagersToConnect(500)) {
-                               LOG.info("Waiting for Nodemanagers to connect");
-                       }
-               } catch (Exception ex) {
-                       ex.printStackTrace();
-                       LOG.error("setup failure", ex);
-                       Assert.fail();
-               }
-       }
-
-       /**
-        * Default @BeforeClass impl. Overwrite this for passing a different 
configuration
-        */
-       @BeforeClass
-       public static void setup() {
-               startYARNWithConfig(yarnConfiguration);
-       }
-
-       // -------------------------- Runner -------------------------- //
-
-       protected static ByteArrayOutputStream outContent;
-       protected static ByteArrayOutputStream errContent;
-       enum RunTypes {
-               YARN_SESSION, CLI_FRONTEND
-       }
-
-       /**
-        * This method returns once the "startedAfterString" has been seen.
-        */
-       protected Runner startWithArgs(String[] args, String 
startedAfterString, RunTypes type) {
-               LOG.info("Running with args {}", Arrays.toString(args));
-
-               outContent = new ByteArrayOutputStream();
-               errContent = new ByteArrayOutputStream();
-               System.setOut(new PrintStream(outContent));
-               System.setErr(new PrintStream(errContent));
-
-
-               final int START_TIMEOUT_SECONDS = 60;
-
-               Runner runner = new Runner(args, type);
-               runner.setName("Frontend (CLI/YARN Client) runner thread 
(startWithArgs()).");
-               runner.start();
-
-               for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
-                       sleep(1000);
-                       // check output for correct TaskManager startup.
-                       if(outContent.toString().contains(startedAfterString)
-                                       || 
errContent.toString().contains(startedAfterString) ) {
-                               LOG.info("Found expected output in redirected 
streams");
-                               return runner;
-                       }
-                       // check if thread died
-                       if(!runner.isAlive()) {
-                               sendOutput();
-                               Assert.fail("Runner thread died before the test 
was finished. Return value = "+runner.getReturnValue());
-                       }
-               }
-
-               sendOutput();
-               Assert.fail("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
-                               "expected string did not show up");
-               return null;
-       }
-
-       protected void runWithArgs(String[] args, String terminateAfterString, 
String[] failOnStrings, RunTypes type, int returnCode) {
-               runWithArgs(args,terminateAfterString, failOnStrings, type, 
returnCode, false);
-       }
-       /**
-        * The test has been passed once the "terminateAfterString" has been 
seen.
-        * @param args Command line arguments for the runner
-        * @param terminateAfterString the runner is searching the stdout and 
stderr for this string. as soon as it appears, the test has passed
-        * @param failOnStrings The runner is searching stdout and stderr for 
the strings specified here. If one appears, the test has failed
-        * @param type Set the type of the runner
-        * @param returnCode Expected return code from the runner.
-        * @param checkLogForTerminateString  If true, the runner checks also 
the log4j logger for the terminate string
-        */
-       protected void runWithArgs(String[] args, String terminateAfterString, 
String[] failOnStrings, RunTypes type, int returnCode, boolean 
checkLogForTerminateString) {
-               LOG.info("Running with args {}", Arrays.toString(args));
-
-               outContent = new ByteArrayOutputStream();
-               errContent = new ByteArrayOutputStream();
-               System.setOut(new PrintStream(outContent));
-               System.setErr(new PrintStream(errContent));
-
-
-               // we wait for at most three minutes
-               final int START_TIMEOUT_SECONDS = 180;
-               final long deadline = System.currentTimeMillis() + 
(START_TIMEOUT_SECONDS * 1000);
-               
-               Runner runner = new Runner(args, type);
-               runner.start();
-
-               boolean expectedStringSeen = false;
-               boolean testPassedFromLog4j = false;
-               do {
-                       sleep(1000);
-                       String outContentString = outContent.toString();
-                       String errContentString = errContent.toString();
-                       if(failOnStrings != null) {
-                               for (String failOnString : failOnStrings) {
-                                       if 
(outContentString.contains(failOnString)
-                                                       || 
errContentString.contains(failOnString)) {
-                                               LOG.warn("Failing test. Output 
contained illegal string '" + failOnString + "'");
-                                               sendOutput();
-                                               // stopping runner.
-                                               runner.sendStop();
-                                               Assert.fail("Output contained 
illegal string '" + failOnString + "'");
-                                       }
-                               }
-                       }
-                       // check output for the expected terminateAfterString.
-                       if(checkLogForTerminateString) {
-                               LoggingEvent matchedEvent = 
UtilsTest.getEventContainingString(terminateAfterString);
-                               if(matchedEvent != null) {
-                                       testPassedFromLog4j = true;
-                                       LOG.info("Found expected output in 
logging event {}", matchedEvent);
-                               }
-
-                       }
-
-                       if (outContentString.contains(terminateAfterString) || 
errContentString.contains(terminateAfterString) || testPassedFromLog4j ) {
-                               expectedStringSeen = true;
-                               LOG.info("Found expected output in redirected 
streams");
-                               // send "stop" command to command line interface
-                               LOG.info("RunWithArgs: request runner to stop");
-                               runner.sendStop();
-                               // wait for the thread to stop
-                               try {
-                                       runner.join(30000);
-                               }
-                               catch (InterruptedException e) {
-                                       LOG.warn("Interrupted while stopping 
runner", e);
-                               }
-                               LOG.warn("RunWithArgs runner stopped.");
-                       }
-                       else {
-                               // check if thread died
-                               if (!runner.isAlive()) {
-                                       if (runner.getReturnValue() != 0) {
-                                               Assert.fail("Runner thread died 
before the test was finished. Return value = "
-                                                               + 
runner.getReturnValue());
-                                       } else {
-                                               LOG.info("Runner stopped 
earlier than expected with return value = 0");
-                                       }
-                                       // leave loop: the runner died, so we 
can not expect new strings to show up.
-                                       break;
-                               }
-                       }
-               }
-               while (!expectedStringSeen && System.currentTimeMillis() < 
deadline);
-               
-               sendOutput();
-               Assert.assertTrue("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
-                               "expected string did not show up", 
expectedStringSeen);
-
-               // check for 0 return code
-               Assert.assertEquals("Expected return value", returnCode, 
runner.getReturnValue());
-               LOG.info("Test was successful");
-       }
-
-       protected static void sendOutput() {
-               System.setOut(originalStdout);
-               System.setErr(originalStderr);
-
-               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
outContent.toString());
-               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
errContent.toString());
-       }
-
-       public static class Runner extends Thread {
-               private final String[] args;
-               private int returnValue;
-               private RunTypes type;
-               private FlinkYarnSessionCli yCli;
-
-               public Runner(String[] args, RunTypes type) {
-                       this.args = args;
-                       this.type = type;
-               }
-
-               public int getReturnValue() {
-                       return returnValue;
-               }
-
-               @Override
-               public void run() {
-                       switch(type) {
-                               case YARN_SESSION:
-                                       yCli = new FlinkYarnSessionCli("", "");
-                                       returnValue = yCli.run(args);
-                                       break;
-                               case CLI_FRONTEND:
-                                       try {
-                                               CliFrontend cli = new 
CliFrontend();
-                                               returnValue = 
cli.parseParameters(args);
-                                       } catch (Exception e) {
-                                               throw new RuntimeException(e);
-                                       }
-                                       break;
-                               default:
-                                       throw new RuntimeException("Unknown 
type " + type);
-                       }
-
-                       if(returnValue != 0) {
-                               Assert.fail("The YARN session returned with 
non-null value="+returnValue);
-                       }
-               }
-
-               public void sendStop() {
-                       if(yCli != null) {
-                               yCli.stop();
-                       }
-               }
-       }
-
-       // -------------------------- Tear down -------------------------- //
-
-       @AfterClass
-       public static void copyOnTravis() {
-               // When we are on travis, we copy the tmp files of JUnit 
(containing the MiniYARNCluster log files)
-               // to <flinkRoot>/target/flink-yarn-tests-*.
-               // The files from there are picked up by the 
./tools/travis_watchdog.sh script
-               // to upload them to Amazon S3.
-               if(isOnTravis()) {
-                       File target = new File("../target" + 
yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
-                       if(!target.mkdirs()) {
-                               LOG.warn("Error creating dirs to {}", target);
-                       }
-                       File src = tmp.getRoot();
-                       LOG.info("copying the final files from {} to {}", 
src.getAbsolutePath(), target.getAbsolutePath());
-                       try {
-                               FileUtils.copyDirectoryToDirectory(src, target);
-                       } catch (IOException e) {
-                               LOG.warn("Error copying the final files from {} 
to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), 
e.getMessage(), e);
-                       }
-               }
-       }
-
-       public static boolean isOnTravis() {
-               return System.getenv("TRAVIS") != null && 
System.getenv("TRAVIS").equals("true");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties 
b/flink-yarn-tests/src/main/resources/log4j-test.properties
deleted file mode 100644
index ebe0d37..0000000
--- a/flink-yarn-tests/src/main/resources/log4j-test.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
-
-# log whats going on between the tests
-log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO
-log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO
-log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO
-log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO
-log4j.logger.org.apache.flink.runtime.leaderelection=INFO
-log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
deleted file mode 100644
index 2f93785..0000000
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import java.util.concurrent.ExecutorService
-
-import akka.actor.ActorRef
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
-import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
-
-import scala.concurrent.duration.FiniteDuration
-
-/** [[YarnJobManager]] implementation which mixes in the 
[[TestingJobManagerLike]] mixin.
-  *
-  * This actor class is used for testing purposes on Yarn. Here we use an 
explicit class definition
-  * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
-  *
-  * @param flinkConfiguration Configuration object for the actor
-  * @param executorService Execution context which is used to execute 
concurrent tasks in the
-  *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
-  * @param instanceManager Instance manager to manage the registered
-  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
-  * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
-  * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Default restart strategy for job restarts
-  * @param timeout Timeout for futures
-  * @param leaderElectionService LeaderElectionService to participate in the 
leader election
-  */
-class TestingYarnJobManager(
-    flinkConfiguration: Configuration,
-    executorService: ExecutorService,
-    instanceManager: InstanceManager,
-    scheduler: Scheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
-    archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
-    timeout: FiniteDuration,
-    leaderElectionService: LeaderElectionService,
-    submittedJobGraphs : SubmittedJobGraphStore,
-    checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
-    jobRecoveryTimeout: FiniteDuration)
-  extends YarnJobManager(
-    flinkConfiguration,
-    executorService,
-    instanceManager,
-    scheduler,
-    libraryCacheManager,
-    archive,
-    restartStrategyFactory,
-    timeout,
-    leaderElectionService,
-    submittedJobGraphs,
-    checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout)
-  with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
deleted file mode 100644
index 73ab7eb..0000000
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.instance.InstanceConnectionInfo
-import org.apache.flink.runtime.io.disk.iomanager.IOManager
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
-
-/** [[YarnTaskManager]] implementation which mixes in the 
[[TestingTaskManagerLike]] mixin.
-  *
-  * This actor class is used for testing purposes on Yarn. Here we use an 
explicit class definition
-  * instead of an anonymous class with the respective mixin to obtain a more 
readable logger name.
-  *
-  * @param config Configuration object for the actor
-  * @param resourceID The Yarn container id
-  * @param connectionInfo Connection information of this actor
-  * @param memoryManager MemoryManager which is responsibel for Flink's 
managed memory allocation
-  * @param ioManager IOManager responsible for I/O
-  * @param network NetworkEnvironment for this actor
-  * @param numberOfSlots Number of slots for this TaskManager
-  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the 
current leading
-  *                              JobManager
-  */
-class TestingYarnTaskManager(
-    config: TaskManagerConfiguration,
-    resourceID: ResourceID,
-    connectionInfo: InstanceConnectionInfo,
-    memoryManager: MemoryManager,
-    ioManager: IOManager,
-    network: NetworkEnvironment,
-    numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService)
-  extends YarnTaskManager(
-    config,
-    resourceID,
-    connectionInfo,
-    memoryManager,
-    ioManager,
-    network,
-    numberOfSlots,
-    leaderRetrievalService)
-  with TestingTaskManagerLike {
-
-  object YarnTaskManager {
-
-    /** Entry point (main method) to run the TaskManager on YARN.
-      * @param args The command line arguments.
-      */
-    def main(args: Array[String]): Unit = {
-      YarnTaskManagerRunner.runYarnTaskManager(args, 
classOf[TestingYarnTaskManager])
-    }
-
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
new file mode 100644
index 0000000..30116af
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.test.util.TestBaseUtils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkYarnSessionCliTest {
+
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
+       @Test
+       public void testDynamicProperties() throws IOException {
+
+               Map<String, String> map = new HashMap<String, 
String>(System.getenv());
+               File tmpFolder = tmp.newFolder();
+               File fakeConf = new File(tmpFolder, "flink-conf.yaml");
+               fakeConf.createNewFile();
+               map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
+               TestBaseUtils.setEnv(map);
+               Options options = new Options();
+               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
+               cli.getYARNSessionCLIOptions(options);
+
+               CommandLineParser parser = new PosixParser();
+               CommandLine cmd = null;
+               try {
+                       cmd = parser.parse(options, new String[]{"run", "-j", 
"fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
+               } catch(Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Parsing failed with " + e.getMessage());
+               }
+
+               AbstractFlinkYarnClient flinkYarnClient = 
cli.createFlinkYarnClient(cmd);
+
+               Assert.assertNotNull(flinkYarnClient);
+
+               Map<String, String> dynProperties = 
CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
+               Assert.assertEquals(1, dynProperties.size());
+               Assert.assertEquals("5 min", 
dynProperties.get("akka.ask.timeout"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
new file mode 100644
index 0000000..b0757f5
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.SignalHandler;
+
+/**
+ * Yarn application master which starts the {@link TestingYarnJobManager},
+ * {@link TestingResourceManager}, and the {@link TestingMemoryArchivist}.
+ */
+public class TestingApplicationMaster extends YarnApplicationMasterRunner {
+
+       @Override
+       public Class<? extends JobManager> getJobManagerClass() {
+               return TestingYarnJobManager.class;
+       }
+
+       @Override
+       public Class<? extends MemoryArchivist> getArchivistClass() {
+               return TestingMemoryArchivist.class;
+       }
+
+       @Override
+       protected Class<? extends TaskManager> getTaskManagerClass() {
+               return TestingYarnTaskManager.class;
+       }
+
+       @Override
+       public Class<? extends YarnFlinkResourceManager> 
getResourceManagerClass() {
+               return TestingYarnFlinkResourceManager.class;
+       }
+
+       public static void main(String[] args) {
+               EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / JobManager", args);
+               SignalHandler.register(LOG);
+
+               // run and exit with the proper return code
+               int returnCode = new TestingApplicationMaster().run(args);
+               System.exit(returnCode);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
new file mode 100644
index 0000000..1efc336
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Yarn client which starts a {@link TestingApplicationMaster}. Additionally 
the client adds the
+ * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the 
set of files which
+ * are shipped to the yarn cluster. This is necessary to load the testing 
classes.
+ */
+public class TestingFlinkYarnClient extends FlinkYarnClientBase {
+
+       public TestingFlinkYarnClient() {
+               List<File> filesToShip = new ArrayList<>();
+
+               File testingJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn-tests"));
+               Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
+                       "Make sure to package the flink-yarn-tests module.");
+
+               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-runtime"));
+               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
+                       "jar. Make sure to package the flink-runtime module.");
+
+               filesToShip.add(testingJar);
+               filesToShip.add(testingRuntimeJar);
+
+               setShipFiles(filesToShip);
+       }
+
+       @Override
+       protected Class<?> getApplicationMasterClass() {
+               return TestingApplicationMaster.class;
+       }
+
+       public static class TestJarFinder implements FilenameFilter {
+
+               private final String jarName;
+
+               public TestJarFinder(final String jarName) {
+                       this.jarName = jarName;
+               }
+
+               @Override
+               public boolean accept(File dir, String name) {
+                       return name.startsWith(jarName) && 
name.endsWith("-tests.jar") &&
+                               dir.getAbsolutePath().contains(dir.separator + 
jarName + dir.separator);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
new file mode 100644
index 0000000..5a61b8f
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnFlinkResourceManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Flink's testing resource manager for Yarn.
+ */
+public class TestingYarnFlinkResourceManager extends YarnFlinkResourceManager {
+
+       public TestingYarnFlinkResourceManager(
+               Configuration flinkConfig,
+               YarnConfiguration yarnConfig,
+               LeaderRetrievalService leaderRetrievalService,
+               String applicationMasterHostName,
+               String webInterfaceURL,
+               ContaineredTaskManagerParameters taskManagerParameters,
+               ContainerLaunchContext taskManagerLaunchContext,
+               int yarnHeartbeatIntervalMillis,
+               int maxFailedContainers,
+               int numInitialTaskManagers) {
+
+               super(
+                       flinkConfig,
+                       yarnConfig,
+                       leaderRetrievalService,
+                       applicationMasterHostName,
+                       webInterfaceURL,
+                       taskManagerParameters,
+                       taskManagerLaunchContext,
+                       yarnHeartbeatIntervalMillis,
+                       maxFailedContainers,
+                       numInitialTaskManagers);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
new file mode 100644
index 0000000..8586a77
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import java.io.IOException;
+
+/**
+ * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
+ */
+public class TestingYarnTaskManagerRunner {
+       public static void main(String[] args) throws IOException {
+               YarnTaskManagerRunner.runYarnTaskManager(args, 
TestingYarnTaskManager.class);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..784bf24
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class UtilsTest {
+       private static final Logger LOG = 
LoggerFactory.getLogger(UtilsTest.class);
+
+       @Test
+       public void testUberjarLocator() {
+               File dir = YarnTestBase.findFile("..", new 
YarnTestBase.RootDirFilenameFilter());
+               Assert.assertNotNull(dir);
+               Assert.assertTrue(dir.getName().endsWith(".jar"));
+               dir = dir.getParentFile().getParentFile(); // from uberjar to 
lib to root
+               Assert.assertTrue(dir.exists());
+               Assert.assertTrue(dir.isDirectory());
+               List<String> files = Arrays.asList(dir.list());
+               Assert.assertTrue(files.contains("lib"));
+               Assert.assertTrue(files.contains("bin"));
+               Assert.assertTrue(files.contains("conf"));
+       }
+
+       /**
+        * Remove 15% of the heap, at least 384MB.
+        *
+        */
+       @Test
+       public void testHeapCutoff() {
+               Configuration conf = new Configuration();
+               conf.setDouble(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
0.15);
+               conf.setInteger(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 
384);
+
+               Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+               Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) 
);
+
+               // test different configuration
+               Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf));
+
+               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_MIN, 
"1000");
+               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"0.1");
+               Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
+
+               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"0.5");
+               Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
+
+               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"1");
+               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+
+               // test also deprecated keys
+               conf = new Configuration();
+               conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
+               conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+
+               Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+               Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) 
);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void illegalArgument() {
+               Configuration conf = new Configuration();
+               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"1.1");
+               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void illegalArgumentNegative() {
+               Configuration conf = new Configuration();
+               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"-0.01");
+               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void tooMuchCutoff() {
+               Configuration conf = new Configuration();
+               conf.setString(ConfigConstants.CONTAINERED_HEAP_CUTOFF_RATIO, 
"6000");
+               Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+       }
+
+       @Test
+       public void testGetEnvironmentVariables() {
+               Configuration testConf = new Configuration();
+               
testConf.setString("yarn.application-master.env.LD_LIBRARY_PATH", 
"/usr/lib/native");
+
+               Map<String, String> res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+               Assert.assertEquals(1, res.size());
+               Map.Entry<String, String> entry = 
res.entrySet().iterator().next();
+               Assert.assertEquals("LD_LIBRARY_PATH", entry.getKey());
+               Assert.assertEquals("/usr/lib/native", entry.getValue());
+       }
+
+       @Test
+       public void testGetEnvironmentVariablesErroneous() {
+               Configuration testConf = new Configuration();
+               testConf.setString("yarn.application-master.env.", 
"/usr/lib/native");
+
+               Map<String, String> res = 
Utils.getEnvironmentVariables("yarn.application-master.env.", testConf);
+
+               Assert.assertEquals(0, res.size());
+       }
+
+       //
+       // --------------- Tools to test if a certain string has been logged 
with Log4j. -------------
+       // See :  
http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
+       //
+       private static TestAppender testAppender;
+       public static void addTestAppender(Class target, Level level) {
+               testAppender = new TestAppender();
+               testAppender.setThreshold(level);
+               org.apache.log4j.Logger lg = 
org.apache.log4j.Logger.getLogger(target);
+               lg.setLevel(level);
+               lg.addAppender(testAppender);
+               
//org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
+       }
+
+       public static void checkForLogString(String expected) {
+               LoggingEvent found = getEventContainingString(expected);
+               if(found != null) {
+                       LOG.info("Found expected string '"+expected+"' in log 
message "+found);
+                       return;
+               }
+               Assert.fail("Unable to find expected string '" + expected + "' 
in log messages");
+       }
+
+       public static LoggingEvent getEventContainingString(String expected) {
+               if(testAppender == null) {
+                       throw new NullPointerException("Initialize test 
appender first");
+               }
+               LoggingEvent found = null;
+               // make sure that different threads are not logging while the 
logs are checked
+               synchronized (testAppender.events) {
+                       for (LoggingEvent event : testAppender.events) {
+                               if 
(event.getMessage().toString().contains(expected)) {
+                                       found = event;
+                                       break;
+                               }
+                       }
+               }
+               return found;
+       }
+
+       public static class TestAppender extends AppenderSkeleton {
+               public final List<LoggingEvent> events = new ArrayList<>();
+               public void close() {}
+               public boolean requiresLayout() {return false;}
+               @Override
+               protected void append(LoggingEvent event) {
+                       synchronized (events){
+                               events.add(event);
+                       }
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
new file mode 100644
index 0000000..a93abf0
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.testkit.JavaTestKit;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+public class YARNHighAvailabilityITCase extends YarnTestBase {
+
+       private static TestingServer zkServer;
+
+       private static ActorSystem actorSystem;
+
+       private static final int numberApplicationAttempts = 10;
+
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() {
+               actorSystem = AkkaUtils.createDefaultActorSystem();
+
+               try {
+                       zkServer = new TestingServer();
+                       zkServer.start();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail("Could not start ZooKeeper testing 
cluster.");
+               }
+
+               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-ha");
+               yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" 
+ numberApplicationAttempts);
+
+               startYARNWithConfig(yarnConfiguration);
+       }
+
+       @AfterClass
+       public static void teardown() throws IOException {
+               if(zkServer != null) {
+                       zkServer.stop();
+               }
+
+               JavaTestKit.shutdownActorSystem(actorSystem);
+               actorSystem = null;
+       }
+
+       /**
+        * Tests that the application master can be killed multiple times and 
that the surviving
+        * TaskManager succesfully reconnects to the newly started JobManager.
+        * @throws Exception
+        */
+       @Test
+       public void testMultipleAMKill() throws Exception {
+               final int numberKillingAttempts = numberApplicationAttempts - 1;
+
+               TestingFlinkYarnClient flinkYarnClient = new 
TestingFlinkYarnClient();
+
+               Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
+               flinkYarnClient.setTaskManagerCount(1);
+               flinkYarnClient.setJobManagerMemory(768);
+               flinkYarnClient.setTaskManagerMemory(1024);
+               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
+               
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+
+               String confDirPath = System.getenv("FLINK_CONF_DIR");
+               flinkYarnClient.setConfigurationDirectory(confDirPath);
+
+               String fsStateHandlePath = tmp.getRoot().getPath();
+
+               
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
+               
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
+                       zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
+                       "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
+                       "@@" + 
FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + 
fsStateHandlePath + "/checkpoints" +
+                       "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + 
fsStateHandlePath + "/recovery");
+               flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
+
+               AbstractFlinkYarnCluster yarnCluster = null;
+
+               final FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
+
+               try {
+                       yarnCluster = flinkYarnClient.deploy();
+                       yarnCluster.connectToCluster();
+                       final Configuration config = 
yarnCluster.getFlinkConfiguration();
+
+                       new JavaTestKit(actorSystem) {{
+                               for (int attempt = 0; attempt < 
numberKillingAttempts; attempt++) {
+                                       new Within(timeout) {
+                                               @Override
+                                               protected void run() {
+                                                       try {
+                                                               
LeaderRetrievalService lrs = 
LeaderRetrievalUtils.createLeaderRetrievalService(config);
+                                                               ActorGateway 
gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+                                                               ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
+
+                                                               
gateway.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
+
+                                                               
expectMsgEquals(Messages.getAcknowledge());
+
+                                                               
gateway.tell(PoisonPill.getInstance());
+                                                       } catch (Exception e) {
+                                                               throw new 
AssertionError("Could not complete test.", e);
+                                                       }
+                                               }
+                                       };
+                               }
+
+                               new Within(timeout) {
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       LeaderRetrievalService 
lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+                                                       ActorGateway gateway2 = 
LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+                                                       ActorGateway 
selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
+                                                       gateway2.tell(new 
TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), 
selfGateway);
+
+                                                       
expectMsgEquals(Messages.getAcknowledge());
+                                               } catch (Exception e) {
+                                                       throw new 
AssertionError("Could not complete test.", e);
+                                               }
+                                       }
+                               };
+
+                       }};
+               } finally {
+                       if (yarnCluster != null) {
+                               yarnCluster.shutdown(false);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70978f56/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
new file mode 100644
index 0000000..38e17a5
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -0,0 +1,539 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.google.common.base.Joiner;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.yarn.UtilsTest.addTestAppender;
+import static org.apache.flink.yarn.UtilsTest.checkForLogString;
+
+
+/**
+ * This test starts a MiniYARNCluster with a CapacityScheduler.
+ * Is has, by default a queue called "default". The configuration here adds 
another queue: "qa-team".
+ */
+public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
+
+       @BeforeClass
+       public static void setup() {
+               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class, ResourceScheduler.class);
+               yarnConfiguration.set("yarn.scheduler.capacity.root.queues", 
"default,qa-team");
+               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
+               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
+               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-capacityscheduler");
+               startYARNWithConfig(yarnConfiguration);
+       }
+
+       /**
+        * Test regular operation, including command line parameter parsing.
+        */
+       @Test
+       public void testClientStartup() {
+               LOG.info("Starting testClientStartup()");
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
+                                               "-n", "1",
+                                               "-jm", "768",
+                                               "-tm", "1024", "-qu", 
"qa-team"},
+                               "Number of connected TaskManagers changed to 1. 
Slots available: 1", null, RunTypes.YARN_SESSION, 0);
+               LOG.info("Finished testClientStartup()");
+       }
+
+       /**
+        * Test per-job yarn cluster
+        *
+        * This also tests the prefixed CliFrontend options for the YARN case
+        * We also test if the requested parallelism of 2 is passed through.
+        * The parallelism is requested at the YARN client (-ys).
+        */
+       @Test
+       public void perJobYarnCluster() {
+               LOG.info("Starting perJobYarnCluster()");
+               addTestAppender(JobClient.class, Level.INFO);
+               File exampleJarLocation = YarnTestBase.findFile("..", new 
ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude 
streaming wordcount here.
+               Assert.assertNotNull("Could not find wordcount jar", 
exampleJarLocation);
+               runWithArgs(new String[]{"run", "-m", "yarn-cluster",
+                               "-yj", flinkUberjar.getAbsolutePath(), "-yt", 
flinkLibFolder.getAbsolutePath(),
+                               "-yn", "1",
+                               "-ys", "2", //test that the job is executed 
with a DOP of 2
+                               "-yjm", "768",
+                               "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()},
+                               /* test succeeded after this string */
+                       "Job execution complete",
+                               /* prohibited strings: (we want to see (2/2)) */
+                       new String[]{"System.out)(1/1) switched to FINISHED "},
+                       RunTypes.CLI_FRONTEND, 0, true);
+               LOG.info("Finished perJobYarnCluster()");
+       }
+
+
+       /**
+        * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
+        */
+       @Test(timeout=100000) // timeout after 100 seconds
+       public void testTaskManagerFailure() {
+               LOG.info("Starting testTaskManagerFailure()");
+               Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
+                               "-n", "1",
+                               "-jm", "768",
+                               "-tm", "1024",
+                               "-s", "3", // set the slots 3 to check if the 
vCores are set properly!
+                               "-nm", "customName",
+                               "-Dfancy-configuration-value=veryFancy",
+                               "-Dyarn.maximum-failed-containers=3",
+                               "-D" + ConfigConstants.YARN_VCORES + "=2"},
+                       "Number of connected TaskManagers changed to 1. Slots 
available: 3",
+                       RunTypes.YARN_SESSION);
+
+               Assert.assertEquals(2, getRunningContainers());
+
+               // ------------------------ Test if JobManager web interface is 
accessible -------
+
+               YarnClient yc = null;
+               try {
+                       yc = YarnClient.createYarnClient();
+                       yc.init(yarnConfiguration);
+                       yc.start();
+
+                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+                       Assert.assertEquals(1, apps.size()); // Only one running
+                       ApplicationReport app = apps.get(0);
+                       Assert.assertEquals("customName", app.getName());
+                       String url = app.getTrackingUrl();
+                       if(!url.endsWith("/")) {
+                               url += "/";
+                       }
+                       if(!url.startsWith("http://";)) {
+                               url = "http://"; + url;
+                       }
+                       LOG.info("Got application URL from YARN {}", url);
+
+                       String response = TestBaseUtils.getFromHTTP(url + 
"taskmanagers/");
+
+                       JsonNode parsedTMs = new 
ObjectMapper().readTree(response);
+                       ArrayNode taskManagers = (ArrayNode) 
parsedTMs.get("taskmanagers");
+                       Assert.assertNotNull(taskManagers);
+                       Assert.assertEquals(1, taskManagers.size());
+                       Assert.assertEquals(3, 
taskManagers.get(0).get("slotsNumber").asInt());
+
+                       // get the configuration from webinterface & check if 
the dynamic properties from YARN show up there.
+                       String jsonConfig = TestBaseUtils.getFromHTTP(url + 
"jobmanager/config");
+                       Map<String, String> parsedConfig = 
WebMonitorUtils.fromKeyValueJsonArray(jsonConfig);
+
+                       Assert.assertEquals("veryFancy", 
parsedConfig.get("fancy-configuration-value"));
+                       Assert.assertEquals("3", 
parsedConfig.get("yarn.maximum-failed-containers"));
+                       Assert.assertEquals("2", 
parsedConfig.get(ConfigConstants.YARN_VCORES));
+
+                       // -------------- FLINK-1902: check if jobmanager 
hostname/port are shown in web interface
+                       // first, get the hostname/port
+                       String oC = outContent.toString();
+                       Pattern p = Pattern.compile("Flink JobManager is now 
running on ([a-zA-Z0-9.-]+):([0-9]+)");
+                       Matcher matches = p.matcher(oC);
+                       String hostname = null;
+                       String port = null;
+                       while(matches.find()) {
+                               hostname = matches.group(1).toLowerCase();
+                               port = matches.group(2);
+                       }
+                       LOG.info("Extracted hostname:port: {} {}", hostname, 
port);
+
+                       Assert.assertEquals("unable to find hostname in " + 
jsonConfig, hostname,
+                               
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY));
+                       Assert.assertEquals("unable to find port in " + 
jsonConfig, port,
+                               
parsedConfig.get(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY));
+
+                       // test logfile access
+                       String logs = TestBaseUtils.getFromHTTP(url + 
"jobmanager/log");
+                       Assert.assertTrue(logs.contains("Starting YARN 
ApplicationMaster"));
+                       Assert.assertTrue(logs.contains("Starting JobManager"));
+                       Assert.assertTrue(logs.contains("Starting JobManager 
Web Frontend"));
+               } catch(Throwable e) {
+                       LOG.warn("Error while running test",e);
+                       Assert.fail(e.getMessage());
+               }
+
+               // ------------------------ Kill container with TaskManager and 
check if vcores are set correctly -------
+
+               // find container id of taskManager:
+               ContainerId taskManagerContainer = null;
+               NodeManager nodeManager = null;
+               UserGroupInformation remoteUgi = null;
+               NMTokenIdentifier nmIdent = null;
+               try {
+                       remoteUgi = UserGroupInformation.getCurrentUser();
+               } catch (IOException e) {
+                       LOG.warn("Unable to get curr user", e);
+                       Assert.fail();
+               }
+               for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+                       NodeManager nm = yarnCluster.getNodeManager(nmId);
+                       ConcurrentMap<ContainerId, Container> containers = 
nm.getNMContext().getContainers();
+                       for(Map.Entry<ContainerId, Container> entry : 
containers.entrySet()) {
+                               String command = Joiner.on(" 
").join(entry.getValue().getLaunchContext().getCommands());
+                               
if(command.contains(YarnTaskManager.class.getSimpleName())) {
+                                       taskManagerContainer = entry.getKey();
+                                       nodeManager = nm;
+                                       nmIdent = new 
NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
+                                       // allow myself to do stuff with the 
container
+                                       // 
remoteUgi.addCredentials(entry.getValue().getCredentials());
+                                       remoteUgi.addTokenIdentifier(nmIdent);
+                               }
+                       }
+                       sleep(500);
+               }
+
+               Assert.assertNotNull("Unable to find container with 
TaskManager", taskManagerContainer);
+               Assert.assertNotNull("Illegal state", nodeManager);
+
+               try {
+                       List<NodeReport> nodeReports = 
yc.getNodeReports(NodeState.RUNNING);
+
+                       // we asked for one node with 2 vcores so we expect 2 
vcores
+                       int userVcores = 0;
+                       for (NodeReport rep: nodeReports) {
+                               userVcores += rep.getUsed().getVirtualCores();
+                       }
+                       Assert.assertEquals(2, userVcores);
+               } catch (Exception e) {
+                       Assert.fail("Test failed: " + e.getMessage());
+               }
+
+               yc.stop();
+
+               List<ContainerId> toStop = new LinkedList<ContainerId>();
+               toStop.add(taskManagerContainer);
+               StopContainersRequest scr = 
StopContainersRequest.newInstance(toStop);
+
+               try {
+                       
nodeManager.getNMContext().getContainerManager().stopContainers(scr);
+               } catch (Throwable e) {
+                       LOG.warn("Error stopping container", e);
+                       Assert.fail("Error stopping container: 
"+e.getMessage());
+               }
+
+               // stateful termination check:
+               // wait until we saw a container being killed and AFTERWARDS a 
new one launched
+               boolean ok = false;
+               do {
+                       LOG.debug("Waiting for correct order of events. Output: 
{}", errContent.toString());
+
+                       String o = errContent.toString();
+                       int killedOff = o.indexOf("Container killed by the 
ApplicationMaster");
+                       if (killedOff != -1) {
+                               o = o.substring(killedOff);
+                               ok = o.indexOf("Launching TaskManager") > 0;
+                       }
+                       sleep(1000);
+               } while(!ok);
+
+
+               // send "stop" command to command line interface
+               runner.sendStop();
+               // wait for the thread to stop
+               try {
+                       runner.join(1000);
+               } catch (InterruptedException e) {
+                       LOG.warn("Interrupted while stopping runner", e);
+               }
+               LOG.warn("stopped");
+
+               // ----------- Send output to logger
+               System.setOut(originalStdout);
+               System.setErr(originalStderr);
+               String oC = outContent.toString();
+               String eC = errContent.toString();
+               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
oC);
+               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
eC);
+
+               // ------ Check if everything happened correctly
+               Assert.assertTrue("Expect to see failed container",
+                       eC.contains("New messages from the YARN cluster"));
+
+               Assert.assertTrue("Expect to see failed container",
+                       eC.contains("Container killed by the 
ApplicationMaster"));
+
+               Assert.assertTrue("Expect to see new container started",
+                       eC.contains("Launching TaskManager") && eC.contains("on 
host"));
+
+               // cleanup auth for the subsequent tests.
+               remoteUgi.getTokenIdentifiers().remove(nmIdent);
+
+               LOG.info("Finished testTaskManagerFailure()");
+       }
+
+       /**
+        * Test deployment to non-existing queue. (user-reported error)
+        * Deployment to the queue is possible because there are no queues, so 
we don't check.
+        */
+       @Test
+       public void testNonexistingQueue() {
+               LOG.info("Starting testNonexistingQueue()");
+               addTestAppender(FlinkYarnClient.class, Level.WARN);
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+                               "-t", flinkLibFolder.getAbsolutePath(),
+                               "-n", "1",
+                               "-jm", "768",
+                               "-tm", "1024",
+                               "-qu", "doesntExist"}, "to unknown queue: 
doesntExist", null, RunTypes.YARN_SESSION, 1);
+               checkForLogString("The specified queue 'doesntExist' does not 
exist. Available queues: default, qa-team");
+               LOG.info("Finished testNonexistingQueue()");
+       }
+
+       /**
+        * Test per-job yarn cluster with the parallelism set at the 
CliFrontend instead of the YARN client.
+        */
+       @Test
+       public void perJobYarnClusterWithParallelism() {
+               LOG.info("Starting perJobYarnClusterWithParallelism()");
+               // write log messages to stdout as well, so that the 
runWithArgs() method
+               // is catching the log output
+               addTestAppender(JobClient.class, Level.INFO);
+               File exampleJarLocation = YarnTestBase.findFile("..", new 
ContainsName(new String[] {"-WordCount.jar"}, "streaming")); // exclude 
streaming wordcount here.
+               Assert.assertNotNull("Could not find wordcount jar", 
exampleJarLocation);
+               runWithArgs(new String[]{"run",
+                               "-p", "2", //test that the job is executed with 
a DOP of 2
+                               "-m", "yarn-cluster",
+                               "-yj", flinkUberjar.getAbsolutePath(),
+                               "-yt", flinkLibFolder.getAbsolutePath(),
+                               "-yn", "1",
+                               "-yjm", "768",
+                               "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()},
+                               /* test succeeded after this string */
+                       "Job execution complete",
+                               /* prohibited strings: (we want to see (2/2)) */
+                       new String[]{"System.out)(1/1) switched to FINISHED "},
+                       RunTypes.CLI_FRONTEND, 0, true);
+               LOG.info("Finished perJobYarnClusterWithParallelism()");
+       }
+
+       /**
+        * Test a fire-and-forget job submission to a YARN cluster.
+        */
+       @Test(timeout=60000)
+       public void testDetachedPerJobYarnCluster() {
+               LOG.info("Starting testDetachedPerJobYarnCluster()");
+
+               File exampleJarLocation = YarnTestBase.findFile(
+                       ".." + File.separator + "flink-examples" + 
File.separator + "flink-examples-batch",
+                       new ContainsName(new String[] {"-WordCount.jar"}));
+
+               Assert.assertNotNull("Could not find batch wordcount jar", 
exampleJarLocation);
+
+               
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
+
+               LOG.info("Finished testDetachedPerJobYarnCluster()");
+       }
+
+       /**
+        * Test a fire-and-forget job submission to a YARN cluster.
+        */
+       @Test(timeout=60000)
+       public void testDetachedPerJobYarnClusterWithStreamingJob() {
+               LOG.info("Starting 
testDetachedPerJobYarnClusterWithStreamingJob()");
+
+               File exampleJarLocation = YarnTestBase.findFile(
+                       ".." + File.separator + "flink-examples" + 
File.separator + "flink-examples-streaming",
+                       new ContainsName(new String[] {"-WordCount.jar"}));
+               Assert.assertNotNull("Could not find streaming wordcount jar", 
exampleJarLocation);
+
+               
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
+
+               LOG.info("Finished 
testDetachedPerJobYarnClusterWithStreamingJob()");
+       }
+
+       private void testDetachedPerJobYarnClusterInternal(String job) {
+               YarnClient yc = YarnClient.createYarnClient();
+               yc.init(yarnConfiguration);
+               yc.start();
+
+               // get temporary folder for writing output of wordcount example
+               File tmpOutFolder = null;
+               try{
+                       tmpOutFolder = tmp.newFolder();
+               }
+               catch(IOException e) {
+                       throw new RuntimeException(e);
+               }
+
+               // get temporary file for reading input data for wordcount 
example
+               File tmpInFile;
+               try{
+                       tmpInFile = tmp.newFile();
+                       FileUtils.writeStringToFile(tmpInFile, 
WordCountData.TEXT);
+               }
+               catch(IOException e) {
+                       throw new RuntimeException(e);
+               }
+
+               Runner runner = startWithArgs(new String[]{"run", "-m", 
"yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
+                               "-yt", flinkLibFolder.getAbsolutePath(),
+                               "-yn", "1",
+                               "-yjm", "768",
+                               "-yD", "yarn.heap-cutoff-ratio=0.5", // test if 
the cutoff is passed correctly
+                               "-ytm", "1024",
+                               "-ys", "2", // test requesting slots from YARN.
+                               "--yarndetached", job, "--input", 
tmpInFile.getAbsoluteFile().toString(), "--output", 
tmpOutFolder.getAbsoluteFile().toString()},
+                       "Job has been submitted with JobID",
+                       RunTypes.CLI_FRONTEND);
+
+               // it should usually be 2, but on slow machines, the number 
varies
+               Assert.assertTrue("There should be at most 2 containers 
running", getRunningContainers() <= 2);
+               // give the runner some time to detach
+               for (int attempt = 0; runner.isAlive() && attempt < 5; 
attempt++) {
+                       try {
+                               Thread.sleep(500);
+                       } catch (InterruptedException e) {
+                       }
+               }
+               Assert.assertFalse("The runner should detach.", 
runner.isAlive());
+               LOG.info("CLI Frontend has returned, so the job is running");
+
+               // find out the application id and wait until it has finished.
+               try {
+                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+
+                       ApplicationId tmpAppId;
+                       if (apps.size() == 1) {
+                               // Better method to find the right appId. But 
sometimes the app is shutting down very fast
+                               // Only one running
+                               tmpAppId = apps.get(0).getApplicationId();
+
+                               LOG.info("waiting for the job with appId {} to 
finish", tmpAppId);
+                               // wait until the app has finished
+                               
while(yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size() > 0) {
+                                       sleep(500);
+                               }
+                       } else {
+                               // get appId by finding the latest finished 
appid
+                               apps = yc.getApplications();
+                               Collections.sort(apps, new 
Comparator<ApplicationReport>() {
+                                       @Override
+                                       public int compare(ApplicationReport 
o1, ApplicationReport o2) {
+                                               return 
o1.getApplicationId().compareTo(o2.getApplicationId())*-1;
+                                       }
+                               });
+                               tmpAppId = apps.get(0).getApplicationId();
+                               LOG.info("Selected {} as the last appId from 
{}", tmpAppId, Arrays.toString(apps.toArray()));
+                       }
+                       final ApplicationId id = tmpAppId;
+
+                       // now it has finished.
+                       // check the output files.
+                       File[] listOfOutputFiles = tmpOutFolder.listFiles();
+
+
+                       Assert.assertNotNull("Taskmanager output not found", 
listOfOutputFiles);
+                       LOG.info("The job has finished. TaskManager output 
files found in {}", tmpOutFolder );
+
+                       // read all output files in output folder to one output 
string
+                       String content = "";
+                       for(File f:listOfOutputFiles)
+                       {
+                               if(f.isFile())
+                               {
+                                       content += 
FileUtils.readFileToString(f) + "\n";
+                               }
+                       }
+                       //String content = 
FileUtils.readFileToString(taskmanagerOut);
+                       // check for some of the wordcount outputs.
+                       Assert.assertTrue("Expected string 'da 5' or '(all,2)' 
not found in string '"+content+"'", content.contains("da 5") || 
content.contains("(da,5)") || content.contains("(all,2)"));
+                       Assert.assertTrue("Expected string 'der 29' or 
'(mind,1)' not found in string'"+content+"'",content.contains("der 29") || 
content.contains("(der,29)") || content.contains("(mind,1)"));
+
+                       // check if the heap size for the TaskManager was set 
correctly
+                       File jobmanagerLog = YarnTestBase.findFile("..", new 
FilenameFilter() {
+                               @Override
+                               public boolean accept(File dir, String name) {
+                                       return name.contains("jobmanager.log") 
&& dir.getAbsolutePath().contains(id.toString());
+                               }
+                       });
+                       Assert.assertNotNull("Unable to locate JobManager log", 
jobmanagerLog);
+                       content = FileUtils.readFileToString(jobmanagerLog);
+                       // TM was started with 1024 but we cut off 50% (NOT THE 
DEFAULT VALUE)
+                       String expected = "Starting TaskManagers with command: 
$JAVA_HOME/bin/java -Xms424m -Xmx424m";
+                       Assert.assertTrue("Expected string '" + expected + "' 
not found in JobManager log: '"+jobmanagerLog+"'",
+                               content.contains(expected));
+                       expected = " (2/2) (attempt #0) to ";
+                       Assert.assertTrue("Expected string '" + expected + "' 
not found in JobManager log." +
+                                       "This string checks that the job has 
been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
+                               content.contains(expected));
+
+                       // make sure the detached app is really finished.
+                       LOG.info("Checking again that app has finished");
+                       ApplicationReport rep;
+                       do {
+                               sleep(500);
+                               rep = yc.getApplicationReport(id);
+                               LOG.info("Got report {}", rep);
+                       } while(rep.getYarnApplicationState() == 
YarnApplicationState.RUNNING);
+
+               } catch(Throwable t) {
+                       LOG.warn("Error while detached yarn session was 
running", t);
+                       Assert.fail(t.getMessage());
+               }
+       }
+
+       @After
+       public void checkForProhibitedLogContents() {
+               ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, 
WHITELISTED_STRINGS);
+       }
+}

Reply via email to