http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
 
b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
deleted file mode 100644
index 1652705..0000000
--- 
a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import org.apache.flink.runtime.ActorLogMessages
-import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.yarn.Messages.StopYarnSession
-
-trait YarnTaskManager extends ActorLogMessages {
-  that: TaskManager =>
-
-  abstract override def receiveWithLogMessages: Receive = {
-    receiveYarnMessages orElse super.receiveWithLogMessages
-  }
-
-  def receiveYarnMessages: Receive = {
-    case StopYarnSession(status) => {
-      context.system.shutdown()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala 
b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
deleted file mode 100644
index 245651d..0000000
--- 
a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn
-
-import akka.actor.{Props, ActorRef, ActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.taskmanager.TaskManager
-
-object YarnUtils {
-  def createActorSystem(hostname: String, port: Int, configuration: 
Configuration): ActorSystem = {
-    val akkaConfig = 
ConfigFactory.parseString(AkkaUtils.getConfigString(hostname, port,
-      configuration) + getConfigString)
-
-    AkkaUtils.createActorSystem(akkaConfig)
-  }
-
-  def createActorSystem(): ActorSystem = {
-    val akkaConfig = 
ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
-      getConfigString)
-
-    AkkaUtils.createActorSystem(akkaConfig)
-  }
-
-  def getConfigString: String = {
-    """
-    |akka{
-    |  loglevel = "DEBUG"
-    |  stdout-loglevel = "DEBUG"
-    |  log-dead-letters-during-shutdown = off
-    |  log-dead-letters = off
-    |
-    |  actor {
-    |    provider = "akka.remote.RemoteActorRefProvider"
-    |  }
-    |
-    |  remote{
-    |    log-remote-lifecycle-events = off
-    |
-    |    netty{
-    |      tcp{
-    |        transport-class = "akka.remote.transport.netty.NettyTransport"
-    |        tcp-nodelay = on
-    |        maximum-frame-size = 1MB
-    |        execution-pool-size = 4
-    |      }
-    |    }
-    |  }
-    |}""".stripMargin
-  }
-
-  def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, 
ActorRef) = {
-    val (hostname, port, config) = TaskManager.parseArgs(args)
-
-    val actorSystem = createActorSystem(hostname, port, config)
-
-    val (connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfiguration) =
-      TaskManager.parseConfiguration(hostname, config, false)
-
-    (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, 
jobManagerURL,
-      taskManagerConfig, networkConnectionConfiguration) with 
YarnTaskManager))(actorSystem))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-addons/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/pom.xml b/flink-addons/pom.xml
index a20a375..ea45cdb 100644
--- a/flink-addons/pom.xml
+++ b/flink-addons/pom.xml
@@ -59,19 +59,6 @@ under the License.
                                <module>flink-tachyon</module>
                        </modules>
                </profile>
-
-               <profile>
-                       <id>include-yarn</id>
-                       <activation>
-                               <property>
-                                       <!-- Please do not remove the 'hadoop2' 
comment. See ./tools/generate_specific_pom.sh -->
-                                       
<!--hadoop2--><name>!hadoop.profile</name>
-                               </property>
-                       </activation>
-                       <modules>
-                               <module>flink-yarn</module>
-                       </modules>
-               </profile>
        </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 85506b2..1a96c9c 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -37,11 +37,6 @@ under the License.
 
        <dependencies>
                <dependency>
-                       <groupId>commons-cli</groupId>
-                       <artifactId>commons-cli</artifactId>
-               </dependency>
-
-               <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
@@ -109,6 +104,11 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>commons-cli</groupId>
+                       <artifactId>commons-cli</artifactId>
+               </dependency>
+
+               <dependency>
                        <groupId>com.typesafe.akka</groupId>
                        <artifactId>akka-testkit_2.10</artifactId>
                </dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 8092513..358783a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -56,6 +56,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -63,7 +64,12 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.PatternLayout;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -71,34 +77,40 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class CliFrontend {
 
+       // run job by deploying Flink into a YARN cluster, if this string is 
specified as the jobmanager address
+       public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
+
+       // command line interface of the YARN session, with a special 
initialization here to prefix all options with y/yarn.
+       private static FlinkYarnSessionCli yarnSessionCLi = new 
FlinkYarnSessionCli("y", "yarn");
+
        //actions
        private static final String ACTION_RUN = "run";
        private static final String ACTION_INFO = "info";
        private static final String ACTION_LIST = "list";
        private static final String ACTION_CANCEL = "cancel";
-       
+
        // general options
        private static final Option HELP_OPTION = new Option("h", "help", 
false, "Show the help for the CLI Frontend.");
        private static final Option VERBOSE_OPTION = new Option("v", "verbose", 
false, "Print more detailed error messages.");
-       
+
        // program (jar file) specific options
        private static final Option JAR_OPTION = new Option("j", "jarfile", 
true, "Flink program JAR file.");
        private static final Option CLASS_OPTION = new Option("c", "class", 
true, "Class with the program entry point (\"main\" method or \"getPlan()\" 
method. Only needed if the JAR file does not specify the class in its 
manifest.");
        private static final Option PARALLELISM_OPTION = new Option("p", 
"parallelism", true, "The parallelism with which to run the program. Optional 
flag to override the default value specified in the configuration.");
        private static final Option ARGS_OPTION = new Option("a", "arguments", 
true, "Program arguments. Arguments can also be added without -a, simply as 
trailing parameters.");
-       
-       private static final Option ADDRESS_OPTION = new Option("m", 
"jobmanager", true, "Address of the JobManager (master) to which to connect. 
Use this flag to connect to a different JobManager than the one specified in 
the configuration.");
-       
+
+       private static final Option ADDRESS_OPTION = new Option("m", 
"jobmanager", true, "Address of the JobManager (master) to which to connect. 
Specify '"+YARN_DEPLOY_JOBMANAGER+"' as the JobManager to deploy a YARN cluster 
for the job. Use this flag to connect to a different JobManager than the one 
specified in the configuration.");
+
        // info specific options
        private static final Option PLAN_OPTION = new Option("e", 
"executionplan", false, "Show optimized execution plan of the program (JSON)");
-       
+
        // list specific options
        private static final Option RUNNING_OPTION = new Option("r", "running", 
false, "Show running programs and their JobIDs");
        private static final Option SCHEDULED_OPTION = new Option("s", 
"scheduled", false, "Show scheduled prorgrams and their JobIDs");
-       
+
        // canceling
        private static final Option ID_OPTION = new Option("i", "jobid", true, 
"JobID of program to cancel");
-       
+
        static {
                initOptions();
        }
@@ -126,6 +138,8 @@ public class CliFrontend {
        public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = 
"dynamicPropertiesString";
        // this has to be a regex for String.split()
        public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
+       private static final String DEFAULT_LOG4J_PATTERN_LAYOUT = 
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n";
+
        
 
        private CommandLineParser parser;
@@ -139,6 +153,15 @@ public class CliFrontend {
        
        private Properties yarnProperties;
 
+       // this flag indicates if the given Job is executed using a YARN 
cluster,
+       // started for this purpose.
+       private boolean runInYarnCluster = false;
+
+       private AbstractFlinkYarnCluster yarnCluster = null;
+
+       protected String configurationDirectory = null;
+
+
        /**
         * Initializes the class
         */
@@ -193,6 +216,9 @@ public class CliFrontend {
                options.addOption(CLASS_OPTION);
                options.addOption(PARALLELISM_OPTION);
                options.addOption(ARGS_OPTION);
+
+               // also add the YARN options so that the parser can parse them
+               yarnSessionCLi.getYARNSessionCLIOptions(options);
                return options;
        }
        
@@ -309,7 +335,7 @@ public class CliFrontend {
                                return 1;
                        }
                        
-                       Client client = getClient(line, 
program.getUserCodeClassLoader());
+                       Client client = getClient(line, 
program.getUserCodeClassLoader(), program.getMainClassName());
                        if (client == null) {
                                printHelpForRun();
                                return 1;
@@ -332,8 +358,25 @@ public class CliFrontend {
                                        return 1;
                                }
                        }
-               
-                       return executeProgram(program, client, parallelism);
+                       int programResult = executeProgram(program, client, 
parallelism);
+                       // check if the program has been executed in a "job 
only" YARN cluster.
+                       if(runInYarnCluster) {
+                               List<String> msgs = 
yarnCluster.getNewMessages();
+                               if(msgs != null && msgs.size() > 1) {
+                                       System.out.println("The following 
messages were created by the YARN cluster while running the Job:");
+                                       for(String msg : msgs) {
+                                               System.out.println(msg);
+                                       }
+                               }
+                               if(yarnCluster.hasFailed()) {
+                                       System.out.println("YARN cluster is in 
failed state!");
+                                       System.out.println("YARN Diagnostics: " 
+ yarnCluster.getDiagnostics());
+                               }
+                               System.out.println("Shutting down YARN 
cluster");
+                               yarnCluster.shutdown();
+                       }
+
+                       return programResult;
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -443,7 +486,7 @@ public class CliFrontend {
                try {
                        // check for json plan request
                        if (plan) {
-                               Client client = getClient(line, 
program.getUserCodeClassLoader());
+                               Client client = getClient(line, 
program.getUserCodeClassLoader(), program.getMainClassName());
                                String jsonPlan = 
client.getOptimizedPlanAsJson(program, parallelism);
                                
                                if (jsonPlan != null) {
@@ -693,14 +736,13 @@ public class CliFrontend {
                }
        }
        
-       protected InetSocketAddress getJobManagerAddress(CommandLine line) 
throws IOException {
+       protected String getJobManagerAddressString(CommandLine line) throws 
IOException {
                Configuration configuration = getGlobalConfiguration();
                
                // first, check if the address comes from the command line 
option
                if (line.hasOption(ADDRESS_OPTION.getOpt())) {
                        try {
-                               String address = 
line.getOptionValue(ADDRESS_OPTION.getOpt());
-                               return 
RemoteExecutor.getInetFromHostport(address);
+                               return 
line.getOptionValue(ADDRESS_OPTION.getOpt());
                        }
                        catch (Exception e) {
                                System.out.println("Error: The JobManager 
address has an invalid format. " + e.getMessage());
@@ -714,9 +756,9 @@ public class CliFrontend {
                                        String address = 
yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
                                        System.out.println("Found a yarn 
properties file (" + YARN_PROPERTIES_FILE + ") file, "
                                                        + "using 
\""+address+"\" to connect to the JobManager");
-                                       return 
RemoteExecutor.getInetFromHostport(address);
+                                       return address;
                                } catch (Exception e) {
-                                       System.out.println("Found a yarn 
properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager 
address from the file. " 
+                                       System.out.println("Found a yarn 
properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager 
address from the file. "
                                                                + 
e.getMessage());
                                        return null;
                                }
@@ -726,7 +768,7 @@ public class CliFrontend {
                                
                                // verify that there is a jobmanager address 
and port in the configuration
                                if (jobManagerAddress == null) {
-                                       System.out.println("Error: Found no 
configuration in the config directory '" + 
+                                       System.out.println("Error: Found no 
configuration in the config directory '" +
                                                        
getConfigurationDirectory() + "' that specifies the JobManager address.");
                                        return null;
                                }
@@ -741,29 +783,37 @@ public class CliFrontend {
                                }
                                
                                if (jobManagerPort == -1) {
-                                       System.out.println("Error: Found no 
configuration in the config directory '" + 
+                                       System.out.println("Error: Found no 
configuration in the config directory '" +
                                                        
getConfigurationDirectory() + "' that specifies the JobManager port.");
                                        return null;
                                }
                                
-                               return new InetSocketAddress(jobManagerAddress, 
jobManagerPort);
+                               return jobManagerAddress + ":" + jobManagerPort;
                        }
                }
        }
        
        protected ActorRef getJobManager(CommandLine line) throws IOException {
-               InetSocketAddress jobManagerAddress = 
getJobManagerAddress(line);
-               if (jobManagerAddress == null) {
+               //TODO: Get ActorRef from YarnCluster if we are in YARN mode.
+               String jobManagerAddressStr = getJobManagerAddressString(line);
+               if (jobManagerAddressStr == null) {
                        return null;
                }
 
-               return JobManager.getJobManager(jobManagerAddress,
+               return 
JobManager.getJobManager(RemoteExecutor.getInetFromHostport(jobManagerAddressStr),
                                ActorSystem.create("CliFrontendActorSystem", 
AkkaUtils
                                                
.getDefaultActorSystemConfig()),getAkkaTimeout());
        }
        
-       
-       protected String getConfigurationDirectory() {
+
+       public String getConfigurationDirectory() {
+               if(configurationDirectory == null) {
+                       configurationDirectory = 
getConfigurationDirectoryFromEnv();
+               }
+               return configurationDirectory;
+       }
+
+       public static String getConfigurationDirectoryFromEnv() {
                String location = null;
                if (System.getenv(ENV_CONFIG_DIRECTORY) != null) {
                        location = System.getenv(ENV_CONFIG_DIRECTORY);
@@ -860,8 +910,50 @@ public class CliFrontend {
                return yarnProperties;
        }
        
-       protected Client getClient(CommandLine line, ClassLoader classLoader) 
throws IOException {
-               return new Client(getJobManagerAddress(line), 
getGlobalConfiguration(), classLoader);
+       protected Client getClient(CommandLine line, ClassLoader classLoader, 
String programName) throws IOException {
+               String jmAddrString = getJobManagerAddressString(line);
+               InetSocketAddress jobManagerAddress = null;
+               if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
+                       System.out.println("YARN cluster mode detected. 
Switching Log4j output to console");
+                       LogManager.getRootLogger().addAppender(new 
ConsoleAppender(new PatternLayout(DEFAULT_LOG4J_PATTERN_LAYOUT)));
+
+                       this.runInYarnCluster = true;
+                       // user wants to run Flink in YARN cluster.
+                       AbstractFlinkYarnClient flinkYarnClient = 
yarnSessionCLi.createFlinkYarnClient(line);
+                       if(flinkYarnClient == null) {
+                               throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
+                       }
+                       try {
+                               yarnCluster = flinkYarnClient.deploy("Flink 
Application: "+programName);
+                       } catch(Exception e) {
+                               throw new RuntimeException("Error deploying the 
YARN cluster", e);
+                       }
+                       jobManagerAddress = yarnCluster.getJobManagerAddress();
+                       System.out.println("YARN cluster started");
+                       System.out.println("JobManager web interface address 
"+yarnCluster.getWebInterfaceURL());
+                       System.out.println("Waiting until all TaskManagers have 
connected");
+                       while(true) {
+                               FlinkYarnClusterStatus status = 
yarnCluster.getClusterStatus();
+                               if(status != null) {
+                                       if (status.getNumberOfTaskManagers() < 
flinkYarnClient.getTaskManagerCount()) {
+                                               System.out.println("TaskManager 
status  (" + 
status.getNumberOfTaskManagers()+"/"+flinkYarnClient.getTaskManagerCount()+")");
+                                       } else {
+                                               System.out.println("Enough 
TaskManagers are connected");
+                                               break;
+                                       }
+                               } else {
+                                       System.out.println("No status updates 
from YARN cluster received so far. Waiting ...");
+                               }
+                               try {
+                                       Thread.sleep(500);
+                               } catch (InterruptedException e) {
+                                       System.err.println("Thread as 
interrupted"); Thread.currentThread().interrupt();
+                               }
+                       }
+               } else {
+                       jobManagerAddress = 
RemoteExecutor.getInetFromHostport(jmAddrString);
+               }
+               return new Client(jobManagerAddress, getGlobalConfiguration(), 
classLoader);
        }
 
        /**
@@ -891,6 +983,10 @@ public class CliFrontend {
                System.out.println("\n  Syntax: run [OPTIONS] <jar-file> 
<arguments>");
                formatter.setSyntaxPrefix("  \"run\" action arguments:");
                formatter.printHelp(" ", 
getRunOptionsWithoutDeprecatedOptions(new Options()));
+               formatter.setSyntaxPrefix("  additional arguments if -m 
"+YARN_DEPLOY_JOBMANAGER+" is set:");
+               Options yarnOpts = new Options();
+               yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
+               formatter.printHelp(" ", yarnOpts);
        }
        
        private void printHelpForInfo() {
@@ -990,14 +1086,15 @@ public class CliFrontend {
                }
        }
 
-       
 
        /**
         * Submits the job based on the arguments
         */
        public static void main(String[] args) throws ParseException {
+
                CliFrontend cli = new CliFrontend();
                int retCode = cli.parseParameters(args);
                System.exit(retCode);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
new file mode 100644
index 0000000..6546ef0
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Class handling the command line interface to the YARN session.
+ */
+public class FlinkYarnSessionCli {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnSessionCli.class);
+
+       //------------------------------------ Constants   
-------------------------
+
+       private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
+       public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
+       public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
+
+
+       private static final int CLIENT_POLLING_INTERVALL = 3;
+
+
+       //------------------------------------ Command Line argument options 
-------------------------
+       // the prefix transformation is used by the CliFrontend static 
constructor.
+       private final Option QUERY;
+       // --- or ---
+       private final Option QUEUE;
+       private final Option SHIP_PATH;
+       private final Option FLINK_JAR;
+       private final Option JM_MEMORY;
+       private final Option TM_MEMORY;
+       private final Option CONTAINER;
+       private final Option SLOTS;
+
+       /**
+        * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+        *  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+        */
+       private final Option DYNAMIC_PROPERTIES;
+
+       private AbstractFlinkYarnCluster yarnCluster = null;
+
+       public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+               QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
+               QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
+               SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
+               FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
+               JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + 
"jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+               TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + 
"taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
+               CONTAINER = new Option(shortPrefix + "n", longPrefix + 
"container", true, "Number of YARN container to allocate (=Number of Task 
Managers)");
+               SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", 
true, "Number of slots per TaskManager");
+               DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, 
"Dynamic properties");
+       }
+
+       public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
+
+               AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
+               if(flinkYarnClient == null) {
+                       return null;
+               }
+
+               if(!cmd.hasOption(CONTAINER.getOpt())) { // number of 
containers is required option!
+                       LOG.error("Missing required argument " + 
CONTAINER.getOpt());
+                       printUsage();
+                       return null;
+               }
+               
flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
+
+               // Jar Path
+               Path localJarPath;
+               if(cmd.hasOption(FLINK_JAR.getOpt())) {
+                       String userPath = 
cmd.getOptionValue(FLINK_JAR.getOpt());
+                       if(!userPath.startsWith("file://")) {
+                               userPath = "file://" + userPath;
+                       }
+                       localJarPath = new Path(userPath);
+               } else {
+                       LOG.info("No path for the flink jar passed. Using the 
location of "+flinkYarnClient.getClass()+" to locate the jar");
+                       localJarPath = new 
Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
+                       if(!localJarPath.toString().contains("uberjar")) {
+                               // we need to have a proper uberjar because 
otherwise we don't have the required classes available on the cluster.
+                               // most likely the user did try to start yarn 
in a regular hadoop2 flink build (not a yarn package) (using ./bin/flink -m 
yarn-cluster)
+                               LOG.error("The detected jar file 
'"+localJarPath+"' is not a uberjar.");
+                               return null;
+                       }
+               }
+
+               flinkYarnClient.setLocalJarPath(localJarPath);
+
+               // Conf Path
+               String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               GlobalConfiguration.loadConfiguration(confDirPath);
+               flinkYarnClient.setConfigurationDirectory(confDirPath);
+               File confFile = new File(confDirPath + File.separator + 
CONFIG_FILE_NAME);
+               if(!confFile.exists()) {
+                       LOG.error("Unable to locate configuration file in 
"+confFile);
+                       return null;
+               }
+               Path confPath = new Path(confFile.getAbsolutePath());
+
+               flinkYarnClient.setConfigurationFilePath(confPath);
+
+               List<File> shipFiles = new ArrayList<File>();
+               // path to directory to ship
+               if(cmd.hasOption(SHIP_PATH.getOpt())) {
+                       String shipPath = 
cmd.getOptionValue(SHIP_PATH.getOpt());
+                       File shipDir = new File(shipPath);
+                       if(shipDir.isDirectory()) {
+                               shipFiles = new 
ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
+                                       @Override
+                                       public boolean accept(File dir, String 
name) {
+                                               return !(name.equals(".") || 
name.equals(".."));
+                                       }
+                               })));
+                       } else {
+                               LOG.warn("Ship directory is not a directory. 
Ignoring it.");
+                       }
+               }
+
+               //check if there is a logback or log4j file
+               if(confDirPath.length() > 0) {
+                       File logback = new File(confDirPath + 
File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
+                       if(logback.exists()) {
+                               shipFiles.add(logback);
+                               flinkYarnClient.setConfigurationFilePath(new 
Path(logback.toURI()));
+                       }
+                       File log4j = new File(confDirPath + File.pathSeparator 
+ CONFIG_FILE_LOG4J_NAME);
+                       if(log4j.exists()) {
+                               shipFiles.add(log4j);
+                               
if(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
+                                       // this means there is already a 
logback configuration file --> fail
+                                       LOG.error("The configuration directory 
('"+confDirPath+"') contains both LOG4J and Logback configuration files." +
+                                                       "Please delete or 
rename one of them.");
+                                       return null;
+                               } // else
+                               flinkYarnClient.setConfigurationFilePath(new 
Path(log4j.toURI()));
+                       }
+               }
+
+               flinkYarnClient.setShipFiles(shipFiles);
+
+               // queue
+               if(cmd.hasOption(QUEUE.getOpt())) {
+                       
flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+               }
+
+               // JobManager Memory
+               if(cmd.hasOption(JM_MEMORY.getOpt())) {
+                       int jmMemory = 
Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+                       flinkYarnClient.setJobManagerMemory(jmMemory);
+               }
+
+               // Task Managers memory
+               if(cmd.hasOption(TM_MEMORY.getOpt())) {
+                       int tmMemory = 
Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
+                       flinkYarnClient.setTaskManagerMemory(tmMemory);
+               }
+
+               if(cmd.hasOption(SLOTS.getOpt())) {
+                       int slots = 
Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
+                       flinkYarnClient.setTaskManagerSlots(slots);
+               }
+
+               String[] dynamicProperties = null;
+               if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+                       dynamicProperties = 
cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+               }
+               String dynamicPropertiesEncoded = 
StringUtils.join(dynamicProperties, 
CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+               
flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
+
+               return flinkYarnClient;
+       }
+
+
+       private void printUsage() {
+               System.out.println("Usage:");
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setWidth(200);
+               formatter.setLeftPadding(5);
+               formatter.setSyntaxPrefix("   Required");
+               Options req = new Options();
+               req.addOption(CONTAINER);
+               formatter.printHelp(" ", req);
+
+               formatter.setSyntaxPrefix("   Optional");
+               Options opt = new Options();
+               opt.addOption(JM_MEMORY);
+               opt.addOption(TM_MEMORY);
+               opt.addOption(QUERY);
+               opt.addOption(QUEUE);
+               opt.addOption(SLOTS);
+               opt.addOption(DYNAMIC_PROPERTIES);
+               formatter.printHelp(" ", opt);
+       }
+
+       public static AbstractFlinkYarnClient getFlinkYarnClient() {
+               AbstractFlinkYarnClient yarnClient = null;
+               try {
+                       Class<AbstractFlinkYarnClient> yarnClientClass = 
(Class<AbstractFlinkYarnClient>) 
Class.forName("org.apache.flink.yarn.FlinkYarnClient");
+                       yarnClient = 
InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
+               } catch (ClassNotFoundException e) {
+                       System.err.println("Unable to locate the Flink YARN 
Client. Please ensure that you are using a Flink build with Hadoop2/YARN 
support. Message: "+e.getMessage());
+                       e.printStackTrace(System.err);
+                       return null; // make it obvious
+               }
+               return yarnClient;
+       }
+
+       private static void writeYarnProperties(Properties properties, File 
propertiesFile) {
+               try {
+                       OutputStream out = new FileOutputStream(propertiesFile);
+                       properties.store(out, "Generated YARN properties file");
+                       out.close();
+               } catch (IOException e) {
+                       throw new RuntimeException("Error writing the 
properties file", e);
+               }
+               propertiesFile.setReadable(true, false); // readable for all.
+       }
+
+       public static void runInteractiveCli(AbstractFlinkYarnCluster 
yarnCluster) {
+               final String HELP = "Available commands:\n" +
+                               "help - show these commands\n" +
+                               "stop - stop the YARN session";
+               int numTaskmanagers = 0;
+               try {
+                       BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in));
+                       while (true) {
+                               // ------------------ check if there are 
updates by the cluster -----------
+
+                               FlinkYarnClusterStatus status = 
yarnCluster.getClusterStatus();
+                               if(status != null && numTaskmanagers != 
status.getNumberOfTaskManagers()) {
+                                       System.err.println("Number of connected 
TaskManagers changed to "+status.getNumberOfTaskManagers()+". "
+                                                       + "Slots available: 
"+status.getNumberOfSlots());
+                                       numTaskmanagers = 
status.getNumberOfTaskManagers();
+                               }
+
+                               List<String> messages = 
yarnCluster.getNewMessages();
+                               if(messages != null && messages.size() > 0) {
+                                       System.err.println("New messages from 
the YARN cluster: ");
+                                       for(String msg : messages) {
+                                               System.err.println(msg);
+                                       }
+                               }
+
+                               if(yarnCluster.hasFailed()) {
+                                       System.err.println("The YARN cluster 
has failed");
+                               }
+
+                               // wait until CLIENT_POLLING_INTERVALL is over 
or the user entered something.
+                               long startTime = System.currentTimeMillis();
+                               while ((System.currentTimeMillis() - startTime) 
< CLIENT_POLLING_INTERVALL * 1000
+                                               && !in.ready()) {
+                                       Thread.sleep(200);
+                               }
+                               //------------- handle interactive command by 
user. ----------------------
+
+                               if (in.ready()) {
+                                       String command = in.readLine();
+                                       if(command.equals("quit") || 
command.equals("stop")) {
+                                               break; // leave loop, cli will 
stop cluster.
+                                       } else if(command.equals("help"))  {
+                                               System.err.println(HELP);
+                                       } else {
+                                               System.err.println("Unknown 
command '"+command+"'. Showing help: \n"+HELP);
+                                       }
+                               }
+                               if(yarnCluster.hasBeenStopped()) {
+                                       LOG.info("Stopping interactive command 
line interface, YARN cluster has been stopped.");
+                                       break;
+                               }
+                       }
+               } catch(Exception e) {
+                       LOG.warn("Exception while running the interactive 
command line interface", e);
+                       return;
+               }
+       }
+
+       public static void main(String[] args) {
+               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // 
no prefix for the YARN session
+               System.exit(cli.run(args));
+       }
+
+       public void getYARNSessionCLIOptions(Options options) {
+               options.addOption(FLINK_JAR);
+               options.addOption(JM_MEMORY);
+               options.addOption(TM_MEMORY);
+               options.addOption(CONTAINER);
+               options.addOption(QUEUE);
+               options.addOption(QUERY);
+               options.addOption(SHIP_PATH);
+               options.addOption(SLOTS);
+               options.addOption(DYNAMIC_PROPERTIES);
+       }
+
+       public int run(String[] args) {
+
+               //
+               //      Command Line Options
+               //
+               Options options = new Options();
+               getYARNSessionCLIOptions(options);
+
+               CommandLineParser parser = new PosixParser();
+               CommandLine cmd = null;
+               try {
+                       cmd = parser.parse(options, args);
+               } catch(Exception e) {
+                       System.out.println(e.getMessage());
+                       printUsage();
+                       return 1;
+               }
+
+               // Query cluster for metrics
+               if(cmd.hasOption(QUERY.getOpt())) {
+                       AbstractFlinkYarnClient flinkYarnClient = 
getFlinkYarnClient();
+                       String description = null;
+                       try {
+                               description = 
flinkYarnClient.getClusterDescription();
+                       } catch (Exception e) {
+                               System.err.println("Error while querying the 
YARN cluster for available resources: "+e.getMessage());
+                               e.printStackTrace(System.err);
+                               return 1;
+                       }
+                       System.out.println(description);
+                       return 0;
+               } else {
+                       AbstractFlinkYarnClient flinkYarnClient = 
createFlinkYarnClient(cmd);
+
+                       if(flinkYarnClient == null) {
+                               System.err.println("Error while starting the 
YARN Client. Please check log output!");
+                               return 1;
+                       }
+
+
+                       try {
+                               yarnCluster = flinkYarnClient.deploy(null);
+                       } catch (Exception e) {
+                               System.err.println("Error while deploying YARN 
cluster: "+e.getMessage());
+                               e.printStackTrace(System.err);
+                               return 1;
+                       }
+                       //------------------ Cluster deployed, handle 
connection details
+                       String jobManagerAddress = 
yarnCluster.getJobManagerAddress().getHostName() + ":" 
+yarnCluster.getJobManagerAddress().getPort();
+                       System.err.println("Flink JobManager is now running on 
" + jobManagerAddress);
+                       System.err.println("JobManager Web Interface: " + 
yarnCluster.getWebInterfaceURL());
+                       // file that we write into the conf/ dir containing the 
jobManager address and the dop.
+                       String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
+                       File yarnPropertiesFile = new File(confDirPath + 
File.separator + CliFrontend.YARN_PROPERTIES_FILE);
+
+                       Properties yarnProps = new Properties();
+                       
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, 
jobManagerAddress);
+                       if(flinkYarnClient.getTaskManagerSlots() != -1) {
+                               
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, 
Integer.toString(flinkYarnClient.getTaskManagerSlots() * 
flinkYarnClient.getTaskManagerCount()) );
+                       }
+                       // add dynamic properties
+                       if(flinkYarnClient.getDynamicPropertiesEncoded() != 
null) {
+                               
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, 
flinkYarnClient.getDynamicPropertiesEncoded());
+                       }
+                       writeYarnProperties(yarnProps, yarnPropertiesFile);
+
+                       //------------------ Cluster running, let user control 
it ------------
+
+                       runInteractiveCli(yarnCluster);
+
+                       LOG.info("Command Line Interface requested session 
shutdown");
+                       yarnCluster.shutdown();
+
+                       try {
+                               yarnPropertiesFile.delete();
+                       } catch (Exception e) {
+                               LOG.warn("Exception while deleting the 
JobManager address file", e);
+                       }
+               }
+               return 0;
+       }
+
+       /**
+        * Utility method for tests.
+        */
+       public void stop() {
+               if(yarnCluster != null) {
+                       LOG.info("Command line interface is shutting down the 
yarnCluster");
+                       yarnCluster.shutdown();
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 00fba95..d8f1bf7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -317,7 +317,6 @@ public class Client {
                }
 
                try {
-
                        if (wait) {
                                return JobClient.submitJobAndWait(jobGraph, 
printStatusDuringExecution, client, timeout);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index b6d4542..1bc533f 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -113,7 +113,7 @@ public class CliFrontendInfoTest {
                }
 
                @Override
-               protected Client getClient(CommandLine line, ClassLoader 
loader) throws IOException {
+               protected Client getClient(CommandLine line, ClassLoader 
loader, String programName) throws IOException {
                        try {
                                return new TestClient(expectedDop);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
index 6a59019..ef7dff6 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
@@ -29,7 +29,6 @@ import java.net.InetSocketAddress;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.CliFrontendTestUtils.TestingCliFrontend;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -55,7 +54,7 @@ public class CliFrontendJobManagerConnectionTest {
                                
                        TestingCliFrontend frontend = new 
TestingCliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
                        
-                       assertTrue(frontend.getJobManagerAddress(line) == null);
+                       assertTrue(frontend.getJobManagerAddressString(line) == 
null);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -72,7 +71,7 @@ public class CliFrontendJobManagerConnectionTest {
                                
                        TestingCliFrontend frontend = new 
TestingCliFrontend(CliFrontendTestUtils.getConfigDir());
                        
-                       InetSocketAddress address = 
frontend.getJobManagerAddress(line);
+                       InetSocketAddress address = 
RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
                        
                        assertNotNull(address);
                        
assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, 
address.getAddress().getHostAddress());
@@ -93,7 +92,7 @@ public class CliFrontendJobManagerConnectionTest {
                                
                        TestingCliFrontend frontend = new 
TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
                        
-                       InetSocketAddress address = 
frontend.getJobManagerAddress(line);
+                       InetSocketAddress address = 
RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
                        
                        assertNotNull(address);
                        
assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS, 
address.getAddress().getHostAddress());
@@ -114,7 +113,7 @@ public class CliFrontendJobManagerConnectionTest {
                                
                        TestingCliFrontend frontend = new 
TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile());
                        
-                       assertTrue(frontend.getJobManagerAddress(line) == null);
+                       assertTrue(frontend.getJobManagerAddressString(line) == 
null);
                }
                catch (Exception e) {
                        System.err.println(e.getMessage());
@@ -131,7 +130,7 @@ public class CliFrontendJobManagerConnectionTest {
                                
                        TestingCliFrontend frontend = new 
TestingCliFrontend(CliFrontendTestUtils.getConfigDir());
                        
-                       InetSocketAddress address = 
frontend.getJobManagerAddress(line);
+                       InetSocketAddress address = 
RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
                        
                        assertNotNull(address);
                        assertEquals("10.221.130.22", 
address.getAddress().getHostAddress());
@@ -152,7 +151,7 @@ public class CliFrontendJobManagerConnectionTest {
                                
                        TestingCliFrontend frontend = new 
TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
                        
-                       InetSocketAddress address = 
frontend.getJobManagerAddress(line);
+                       InetSocketAddress address = 
RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
                        
                        assertNotNull(address);
                        assertEquals("10.221.130.22", 
address.getAddress().getHostAddress());

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index b9af927..0cd7104 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -62,7 +62,7 @@ public class CliFrontendListCancelTest {
                        // test unrecognized option
                        {
                                String[] parameters = {"-v", "-l"};
-                               CliFrontend testFrontend = new CliFrontend();
+                               CliFrontend testFrontend = new 
CliFrontendTestUtils.TestingCliFrontend();
                                int retCode = testFrontend.cancel(parameters);
                                assertTrue(retCode == 2);
                        }
@@ -70,7 +70,7 @@ public class CliFrontendListCancelTest {
                        // test missing job id
                        {
                                String[] parameters = {};
-                               CliFrontend testFrontend = new CliFrontend();
+                               CliFrontend testFrontend = new 
CliFrontendTestUtils.TestingCliFrontend();
                                int retCode = testFrontend.cancel(parameters);
                                assertTrue(retCode != 0);
                        }
@@ -104,7 +104,7 @@ public class CliFrontendListCancelTest {
                        // test unrecognized option
                        {
                                String[] parameters = {"-v", "-k"};
-                               CliFrontend testFrontend = new CliFrontend();
+                               CliFrontend testFrontend = new 
CliFrontendTestUtils.TestingCliFrontend();
                                int retCode = testFrontend.list(parameters);
                                assertTrue(retCode == 2);
                        }
@@ -112,7 +112,7 @@ public class CliFrontendListCancelTest {
                        // test missing flags
                        {
                                String[] parameters = {};
-                               CliFrontend testFrontend = new CliFrontend();
+                               CliFrontend testFrontend = new 
CliFrontendTestUtils.TestingCliFrontend();
                                int retCode = testFrontend.list(parameters);
                                assertTrue(retCode != 0);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 9d4c6ae..95f6cb8 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -113,19 +113,13 @@ public class CliFrontendTestUtils {
        
        public static class TestingCliFrontend extends CliFrontend {
                
-               public final String configDir;
-               
+
                public TestingCliFrontend() {
                        this(getConfigDir());
                }
                
                public TestingCliFrontend(String configDir) {
-                       this.configDir = configDir;
-               }
-               
-               @Override
-               protected String getConfigurationDirectory() {
-                       return this.configDir;
+                       this.configurationDirectory = configDir;
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d482e3c..969329e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -193,6 +193,20 @@ public final class ConfigConstants {
         */
        public static final String JOBCLIENT_POLLING_INTERVAL_KEY = 
"jobclient.polling.interval";
 
+       // ------------------------ YARN Configuration ------------------------
+
+       /**
+        * Percentage of heap space to remove from containers started by YARN.
+        */
+       public static final String YARN_HEAP_CUTOFF_RATIO = 
"yarn.heap-cutoff-ratio";
+
+       /**
+        * Upper bound for heap cutoff on YARN.
+        * The "yarn.heap-cutoff-ratio" is removing a certain ratio from the 
heap.
+        * This value is limiting this cutoff to a absolute value.
+        */
+       public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap";
+
        // ------------------------ Hadoop Configuration 
------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index fb45466..c3c7ae8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -242,7 +242,6 @@ public abstract class FileSystem {
                                        // by now we know that the 
HadoopFileSystem wrapper can wrap the file system.
                                        fs = 
instantiateHadoopFileSystemWrapper(wrapperClass);
                                        fs.initialize(uri);
-                                       System.out.println("Initializing new 
instance of wrapper for "+wrapperClass);
                                        CACHE.put(wrappedKey, fs);
 
                                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index e753a05..91359c2 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -197,10 +197,12 @@ under the License.
                                                                                
                
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                                                                
        <resource>reference.conf</resource>
                                                                                
</transformer>
+                                                                               
<!-- The service transformer is needed to merge META-INF/services files -->
+                                                                               
<transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                                                                
<transformer
                                                                                
                
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                                                                
        <manifestEntries>
-                                                                               
                <Main-Class>org.apache.flink.yarn.Client</Main-Class>
+                                                                               
                <Main-Class>org.apache.flink.yarn.FlinkYarnClient</Main-Class>
                                                                                
        </manifestEntries>
                                                                                
</transformer>
                                                                        
</transformers>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/src/main/flink-bin/bin/flink
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink 
b/flink-dist/src/main/flink-bin/bin/flink
index e5dd3c6..12dd6b7 100755
--- a/flink-dist/src/main/flink-bin/bin/flink
+++ b/flink-dist/src/main/flink-bin/bin/flink
@@ -50,4 +50,5 @@ log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH 
org.apache.flink.client.CliFrontend $*
+# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
+$JAVA_RUN $JVM_ARGS $log_setting -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH 
org.apache.flink.client.CliFrontend $*

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh 
b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 21da505..bf0775f 100644
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting 
org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j 
$FLINK_LIB_DIR/*yarn-uberjar.jar $*
+$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting 
org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../ship/ -j 
$FLINK_LIB_DIR/*yarn-uberjar.jar $*
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index 9e0a55b..026758d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -34,7 +34,6 @@ import javax.servlet.http.HttpServletResponse;
 
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.Instance;
 
@@ -60,13 +59,13 @@ public class SetupInfoServlet extends HttpServlet {
        private static final Logger LOG = 
LoggerFactory.getLogger(SetupInfoServlet.class);
        
        
-       final private Configuration globalC;
+       final private Configuration configuration;
        final private ActorRef jobmanager;
        final private FiniteDuration timeout;
        
        
-       public SetupInfoServlet(ActorRef jm, FiniteDuration timeout) {
-               globalC = GlobalConfiguration.getConfiguration();
+       public SetupInfoServlet(Configuration conf, ActorRef jm, FiniteDuration 
timeout) {
+               configuration = conf;
                this.jobmanager = jm;
                this.timeout = timeout;
        }
@@ -74,7 +73,6 @@ public class SetupInfoServlet extends HttpServlet {
        @Override
        protected void doGet(HttpServletRequest req, HttpServletResponse resp)
                        throws ServletException, IOException {
-               
                resp.setStatus(HttpServletResponse.SC_OK);
                resp.setContentType("application/json");
                
@@ -86,15 +84,15 @@ public class SetupInfoServlet extends HttpServlet {
        }
        
        private void writeGlobalConfiguration(HttpServletResponse resp) throws 
IOException {
-               
-               Set<String> keys = globalC.keySet();
+               Set<String> keys = configuration.keySet();
                List<String> list = new ArrayList<String>(keys);
                Collections.sort(list);
                
                JSONObject obj = new JSONObject();
                for (String k : list) {
                        try {
-                               obj.put(k, globalC.getString(k, ""));
+
+                               obj.put(k, configuration.getString(k, ""));
                        } catch (JSONException e) {
                                LOG.warn("Json object creation failed", e);
                        }
@@ -151,7 +149,7 @@ public class SetupInfoServlet extends HttpServlet {
        private static final Comparator<Instance> INSTANCE_SORTER = new 
Comparator<Instance>() {
                @Override
                public int compare(Instance o1, Instance o2) {
-                       return 
o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
+               return 
o1.getInstanceConnectionInfo().compareTo(o2.getInstanceConnectionInfo());
                }
        };
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 24dbaf7..2b92f9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.eclipse.jetty.http.security.Constraint;
 import org.eclipse.jetty.security.ConstraintMapping;
 import org.eclipse.jetty.security.ConstraintSecurityHandler;
@@ -88,7 +87,7 @@ public class WebInfoServer {
                
                // if no explicit configuration is given, use the global 
configuration
                if (config == null) {
-                       config = GlobalConfiguration.getConfiguration();
+                       throw new IllegalArgumentException("No Configuration 
has been passed to the web server");
                }
                
                this.port = 
config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
@@ -133,7 +132,7 @@ public class WebInfoServer {
                servletContext.addServlet(new ServletHolder(new 
JobmanagerInfoServlet(jobmanager,
                                archive, timeout)), "/jobsInfo");
                servletContext.addServlet(new ServletHolder(new 
LogfileInfoServlet(logDirFiles)), "/logInfo");
-               servletContext.addServlet(new ServletHolder(new 
SetupInfoServlet(jobmanager, timeout)),
+               servletContext.addServlet(new ServletHolder(new 
SetupInfoServlet(config, jobmanager, timeout)),
                                "/setupInfo");
                servletContext.addServlet(new ServletHolder(new MenuServlet()), 
"/menu");
 
@@ -206,4 +205,8 @@ public class WebInfoServer {
                server.stop();
        }
 
+       public Server getServer() {
+               return server;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index ec2633c..5a5f515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -76,7 +76,7 @@ public class NetUtils {
                                                case ADDRESS:
                                                        if 
(hasCommonPrefix(jobManagerAddress.getAddress().getAddress(), i.getAddress())) {
                                                                if 
(tryToConnect(i, jobManagerAddress, strategy.getTimeout())) {
-                                                                       
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+                                                                       
LOG.info("Determined " + i + " as the machine's own IP address");
                                                                        return 
i;
                                                                }
                                                        }
@@ -86,7 +86,7 @@ public class NetUtils {
                                                case SLOW_CONNECT:
                                                        boolean correct = 
tryToConnect(i, jobManagerAddress, strategy.getTimeout());
                                                        if (correct) {
-                                                               
LOG.info("Determined " + i + " as the TaskTracker's own IP address");
+                                                               
LOG.info("Determined " + i + " as the machine's own IP address");
                                                                return i;
                                                        }
                                                        break;

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
new file mode 100644
index 0000000..7f2b14e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.yarn;
+
+import org.apache.hadoop.fs.Path;
+import java.io.File;
+import java.util.List;
+
+public abstract class AbstractFlinkYarnClient {
+
+       // ---- Setter for YARN Cluster properties ----- //
+       public abstract void setJobManagerMemory(int memoryMB);
+       public abstract void setTaskManagerMemory(int memoryMB);
+       public abstract void setTaskManagerSlots(int slots);
+       public abstract int getTaskManagerSlots();
+       public abstract void setQueue(String queue);
+       public abstract void setLocalJarPath(Path localJarPath);
+       public abstract void setConfigurationFilePath(Path confPath);
+       public abstract void setFlinkLoggingConfigurationPath(Path logConfPath);
+       public abstract Path getFlinkLoggingConfigurationPath();
+       public abstract void setTaskManagerCount(int tmCount);
+       public abstract int getTaskManagerCount();
+       public abstract void setConfigurationDirectory(String confDirPath);
+       // List of files to transfer to the YARN containers.
+       public abstract void setShipFiles(List<File> shipFiles);
+       public abstract void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded);
+       public abstract String getDynamicPropertiesEncoded();
+
+       // ---- Operations on the YARN cluster ----- //
+       public abstract String getClusterDescription() throws Exception;
+
+       public abstract AbstractFlinkYarnCluster deploy(String clusterName) 
throws Exception;
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
new file mode 100644
index 0000000..58eaf1d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public abstract class AbstractFlinkYarnCluster {
+
+       public abstract InetSocketAddress getJobManagerAddress();
+
+       public abstract String getWebInterfaceURL();
+
+       public abstract void shutdown();
+
+       public abstract boolean hasBeenStopped();
+
+       public abstract FlinkYarnClusterStatus getClusterStatus();
+
+       public abstract boolean hasFailed();
+
+       /**
+        * @return Diagnostics if the Cluster is in "failed" state.
+        */
+       public abstract String getDiagnostics();
+
+       public abstract List<String> getNewMessages();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
new file mode 100644
index 0000000..2aaaaa0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/FlinkYarnClusterStatus.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.yarn;
+
+import java.io.Serializable;
+
+
+public class FlinkYarnClusterStatus implements Serializable {
+       private int numberOfTaskManagers;
+       private int numberOfSlots;
+
+       public FlinkYarnClusterStatus() {
+       }
+
+       public FlinkYarnClusterStatus(int numberOfTaskManagers, int 
numberOfSlots) {
+               this.numberOfTaskManagers = numberOfTaskManagers;
+               this.numberOfSlots = numberOfSlots;
+       }
+
+       public int getNumberOfTaskManagers() {
+               return numberOfTaskManagers;
+       }
+
+       public void setNumberOfTaskManagers(int numberOfTaskManagers) {
+               this.numberOfTaskManagers = numberOfTaskManagers;
+       }
+
+       public int getNumberOfSlots() {
+               return numberOfSlots;
+       }
+
+       public void setNumberOfSlots(int numberOfSlots) {
+               this.numberOfSlots = numberOfSlots;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               FlinkYarnClusterStatus that = (FlinkYarnClusterStatus) o;
+
+               if (numberOfSlots != that.numberOfSlots) {
+                       return false;
+               }
+               if (numberOfTaskManagers != that.numberOfTaskManagers) {
+                       return false;
+               }
+
+               return true;
+       }
+
+       @Override
+       public int hashCode() {
+               int result = numberOfTaskManagers;
+               result = 31 * result + numberOfSlots;
+               return result;
+       }
+
+       @Override
+       public String toString() {
+               return "FlinkYarnClusterStatus{" +
+                               "numberOfTaskManagers=" + numberOfTaskManagers +
+                               ", numberOfSlots=" + numberOfSlots +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index cf678b0..1f2791c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -53,6 +53,9 @@ object AkkaUtils {
   }
 
   def createActorSystem(akkaConfig: Config): ActorSystem = {
+    if(LOG.isDebugEnabled) {
+      LOG.debug(s"Using akka config to create actor system: $akkaConfig")
+    }
     ActorSystem.create("flink", akkaConfig)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 6a4beed..195a0b6 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -76,7 +76,8 @@ class JobClientListener(client: ActorRef) extends Actor with 
ActorLogMessages wi
       client ! Failure(new JobExecutionException(msg, false))
       self ! PoisonPill
     case msg =>
-      println(msg.toString)
+      // we have to use System.out.println here to avoid erroneous behavior 
for output redirection
+      System.out.println(msg.toString)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cd1119d..37a41a5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -437,6 +437,13 @@ class JobManager(val configuration: Configuration)
     }
   }
 
+  /**
+   * Handle unmatched messages with an exception.
+   */
+  override def unhandled(message: Any): Unit = {
+    throw new RuntimeException("Received unknown message " + message)
+  }
+
   private def removeJob(jobID: JobID): Unit = {
     currentJobs.remove(jobID) match {
       case Some((eg, _)) => archive ! ArchiveExecutionGraph(jobID, eg)

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
new file mode 100644
index 0000000..0cccf3a
--- /dev/null
+++ b/flink-yarn-tests/pom.xml
@@ -0,0 +1,121 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>0.9-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <!--
+       There is a separate "flink-yarn-tests" package that expects the 
"flink-dist" package
+       to be build before.
+       We need the YARN fat jar build by flink-dist for the tests.
+       -->
+       
+       <artifactId>flink-yarn-tests</artifactId>
+       <name>flink-yarn-tests</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <artifactId>hadoop-core</artifactId>
+                                       <groupId>org.apache.hadoop</groupId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-yarn</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-yarn-client</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-yarn-common</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-yarn-server-tests</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minicluster</artifactId>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-mapreduce-client-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Set the root directory for all tests to the 
project root.
+                       We need this to be able to locate the final build (in 
flink-dist)
+                       -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                               <configuration>
+                                       <!-- Enforce single threaded execution 
due to port conflicts with the mini yarn cluster -->
+                                       <forkCount>1</forkCount>
+                                       <workingDirectory>../</workingDirectory>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-surefire-plugin</artifactId>
+                               <configuration>
+                                       <workingDirectory>../</workingDirectory>
+                               </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..9fd2541
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+
+public class UtilsTest {
+
+       @Test
+       public void testUberjarLocator() {
+               File dir = YarnTestBase.findFile(".", new 
YarnTestBase.RootDirFilenameFilter());
+               Assert.assertNotNull(dir);
+               dir = dir.getParentFile().getParentFile(); // from uberjar to 
lib to root
+               Assert.assertTrue(dir.exists());
+               Assert.assertTrue(dir.isDirectory());
+               Assert.assertTrue(dir.toString().contains("flink-dist"));
+               List<String> files = Arrays.asList(dir.list());
+               Assert.assertTrue(files.contains("lib"));
+               Assert.assertTrue(files.contains("bin"));
+               Assert.assertTrue(files.contains("conf"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
new file mode 100644
index 0000000..25e1aa2
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerIT.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This test starts a MiniYARNCluster with a CapacityScheduler.
+ * Is has, by default a queue called "default". The configuration here adds 
another queue: "qa-team".
+ */
+public class YARNSessionCapacitySchedulerIT extends YarnTestBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionCapacitySchedulerIT.class);
+
+       @BeforeClass
+       public static void setup() {
+               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
CapacityScheduler.class, ResourceScheduler.class);
+               yarnConfiguration.set("yarn.scheduler.capacity.root.queues", 
"default,qa-team");
+               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
+               
yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
+               startYARNWithConfig(yarnConfiguration);
+       }
+
+       /**
+        * Test regular operation, including command line parameter parsing.
+        */
+       @Test
+       public void testClientStartup() {
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                                               "-n", "1",
+                                               "-jm", "512",
+                                               "-tm", "1024", "-qu", 
"qa-team"},
+                               "Number of connected TaskManagers changed to 1. 
Slots available: 1", RunTypes.YARN_SESSION);
+       }
+
+
+       /**
+        * Test deployment to non-existing queue. (user-reported error)
+        * Deployment to the queue is possible because there are no queues, so 
we don't check.
+        */
+       @Test
+       public void testNonexistingQueue() {
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "1",
+                               "-jm", "512",
+                               "-tm", "1024",
+                               "-qu", "doesntExist"}, "Error while deploying 
YARN cluster: The specified queue 'doesntExist' does not exist. Available 
queues: default, qa-team, ", RunTypes.YARN_SESSION);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2af65867/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
new file mode 100644
index 0000000..5f8ae87
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOIT.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+
+/**
+ * This test starts a MiniYARNCluster with a FIFO scheudler.
+ * There are no queues for that scheduler.
+ */
+public class YARNSessionFIFOIT extends YarnTestBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionFIFOIT.class);
+
+       /*
+       Override init with FIFO scheduler.
+        */
+       @BeforeClass
+       public static void setup() {
+               yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
FifoScheduler.class, ResourceScheduler.class);
+               startYARNWithConfig(yarnConfiguration);
+       }
+       /**
+        * Test regular operation, including command line parameter parsing.
+        */
+       @Test
+       public void testClientStartup() {
+               LOG.info("Starting testClientStartup()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                                               "-n", "1",
+                                               "-jm", "512",
+                                               "-tm", "1024"},
+                               "Number of connected TaskManagers changed to 1. 
Slots available: 1", RunTypes.YARN_SESSION);
+               LOG.info("Finished testClientStartup()");
+       }
+
+       /**
+        * Test querying the YARN cluster.
+        *
+        * This test validates through 666*2 cores in the "cluster".
+        */
+       @Test
+       public void testQueryCluster() {
+               LOG.info("Starting testQueryCluster()");
+               runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 
totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
+               LOG.info("Finished testQueryCluster()");
+       }
+
+       /**
+        * Test deployment to non-existing queue. (user-reported error)
+        * Deployment to the queue is possible because there are no queues, so 
we don't check.
+        */
+       @Test
+       public void testNonexistingQueue() {
+               LOG.info("Starting testNonexistingQueue()");
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "1",
+                               "-jm", "512",
+                               "-tm", "1024",
+                               "-qu", "doesntExist"}, "Number of connected 
TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+               LOG.info("Finished testNonexistingQueue()");
+       }
+
+       /**
+        * Test requesting more resources than available.
+        */
+       @Test
+       public void testMoreNodesThanAvailable() {
+               LOG.info("Starting testMoreNodesThanAvailable()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "10",
+                               "-jm", "512",
+                               "-tm", "1024"}, "Error while deploying YARN 
cluster: This YARN session requires 10752MB of memory in the cluster. There are 
currently only 8192MB available.", RunTypes.YARN_SESSION);
+               LOG.info("Finished testMoreNodesThanAvailable()");
+       }
+
+       /**
+        * The test cluster has the following resources:
+        * - 2 Nodes with 4096 MB each.
+        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+        *
+        * We allocate:
+        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
+        * 5 TaskManagers with 1585 MB
+        *
+        * user sees a total request of: 8181 MB (fits)
+        * system sees a total request of: 8437 (doesn't fit due to min alloc 
mb)
+        */
+       @Test
+       public void testResourceComputation() {
+               LOG.info("Starting testResourceComputation()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "5",
+                               "-jm", "256",
+                               "-tm", "1585"}, "Error while deploying YARN 
cluster: This YARN session requires 8437MB of memory in the cluster. There are 
currently only 8192MB available.", RunTypes.YARN_SESSION);
+               LOG.info("Finished testResourceComputation()");
+       }
+
+       /**
+        * The test cluster has the following resources:
+        * - 2 Nodes with 4096 MB each.
+        * - RM_SCHEDULER_MINIMUM_ALLOCATION_MB is 512
+        *
+        * We allocate:
+        * 1 JobManager with 256 MB (will be automatically upgraded to 512 due 
to min alloc mb)
+        * 2 TaskManagers with 3840 MB
+        *
+        * the user sees a total request of: 7936 MB (fits)
+        * the system sees a request of: 8192 MB (fits)
+        * HOWEVER: one machine is going to need 3840 + 512 = 4352 MB, which 
doesn't fit.
+        *
+        * --> check if the system properly rejects allocating this session.
+        */
+       @Test
+       public void testfullAlloc() {
+               LOG.info("Starting testfullAlloc()");
+               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+                               "-n", "2",
+                               "-jm", "256",
+                               "-tm", "3840"}, "Error while deploying YARN 
cluster: There is not enough memory available in the YARN cluster. The 
TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
+                               "After allocating the JobManager (512MB) and 
(1/2) TaskManagers, the following NodeManagers are available: [3584, 256]", 
RunTypes.YARN_SESSION);
+               LOG.info("Finished testfullAlloc()");
+       }
+
+       /**
+        * Test per-job yarn cluster
+        *
+        * This also tests the prefixed CliFrontend options for the YARN case
+        */
+       @Test
+       public void perJobYarnCluster() {
+               LOG.info("Starting perJobYarnCluster()");
+               File exampleJarLocation = YarnTestBase.findFile(".", new 
ContainsName("-WordCount.jar", "streaming")); // exclude streaming wordcount 
here.
+               runWithArgs(new String[] {"run", "-m", "yarn-cluster",
+                               "-yj", flinkUberjar.getAbsolutePath(),
+                               "-yn", "1",
+                               "-yjm", "512",
+                               "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()}, "Job execution switched to status 
FINISHED.", RunTypes.CLI_FRONTEND);
+               LOG.info("Finished perJobYarnCluster()");
+       }
+
+       /**
+        * Test the YARN Java API
+        */
+       @Test
+       public void testJavaAPI() {
+               final int WAIT_TIME = 15;
+               LOG.info("Starting testJavaAPI()");
+
+               AbstractFlinkYarnClient flinkYarnClient = 
FlinkYarnSessionCli.getFlinkYarnClient();
+               flinkYarnClient.setTaskManagerCount(1);
+               flinkYarnClient.setJobManagerMemory(512);
+               flinkYarnClient.setTaskManagerMemory(512);
+               flinkYarnClient.setLocalJarPath(new 
Path(flinkUberjar.getAbsolutePath()));
+               String confDirPath = System.getenv("FLINK_CONF_DIR");
+               flinkYarnClient.setConfigurationDirectory(confDirPath);
+               flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
+
+               // deploy
+               AbstractFlinkYarnCluster yarnCluster = null;
+               try {
+                       yarnCluster = flinkYarnClient.deploy(null);
+               } catch (Exception e) {
+                       System.err.println("Error while deploying YARN cluster: 
"+e.getMessage());
+                       e.printStackTrace(System.err);
+                       Assert.fail();
+               }
+               FlinkYarnClusterStatus expectedStatus = new 
FlinkYarnClusterStatus(1, 1);
+               for(int second = 0; second < WAIT_TIME * 2; second++) { // run 
"forever"
+                       try {
+                               Thread.sleep(1000);
+                       } catch (InterruptedException e) {
+                               LOG.warn("Interrupted", e);
+                               Thread.interrupted();
+                       }
+                       FlinkYarnClusterStatus status = 
yarnCluster.getClusterStatus();
+                       if(status != null && status.equals(expectedStatus)) {
+                               LOG.info("Cluster reached status " + status);
+                               break; // all good, cluster started
+                       }
+                       if(second > WAIT_TIME) {
+                               // we waited for 15 seconds. cluster didn't 
come up correctly
+                               Assert.fail("The custer didn't start after " + 
WAIT_TIME + " seconds");
+                       }
+               }
+
+               // use the cluster
+               Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+               Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+
+               LOG.info("Shutting down cluster. All tests passed");
+               // shutdown cluster
+               yarnCluster.shutdown();
+               LOG.info("Finished testJavaAPI()");
+       }
+}

Reply via email to