[FLINK-1629][FLINK-1630][FLINK-1547] Add option to start Flink on YARN in a 
detached mode. YARN container reallocation.

This commit is changing:
[FLINK-1629]: users can now "fire and forget" jobs to YARN or YARN sessions to 
there. (Detached mode)
[FLINK-1630]: YARN is now reallocating failed YARN containers during the 
lifetime of a YARN session.
[FLINK-1547]: Users can now specify if they want the ApplicationMaster (= the 
JobManager = the entire YARN session) to restart on failure, and how often. 
After the first restart, the session will behave like a detached session. There 
is now backup of state between the old and the new AM.

The whole resource negotiation process between the RM and the AM has been 
reworked.
Flink is now much more flexible when requesting new containers and also giving 
back uneeded containers.

A new test case is testing the container restart. It is also verifying that the 
web frontend is proplery started,
that the logfile access is possible and
that the configuration values the user specifies when starting the YARN session 
are visible in the web frontend.

This closes #468


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13bb21b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13bb21b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13bb21b1

Branch: refs/heads/master
Commit: 13bb21b1bd83b4f9e434f735ba7517ffd03478d3
Parents: fd9ca4d
Author: Robert Metzger <rmetz...@apache.org>
Authored: Thu Mar 5 15:03:05 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Thu Mar 12 11:33:37 2015 +0100

----------------------------------------------------------------------
 docs/yarn_setup.md                              |  51 +-
 .../flink/client/FlinkYarnSessionCli.java       |  38 +-
 .../flink/configuration/ConfigConstants.java    |  29 ++
 .../runtime/jobmanager/web/WebInfoServer.java   |  23 +-
 .../flink/runtime/util/SignalHandler.java       |  80 +++
 .../runtime/yarn/AbstractFlinkYarnClient.java   |   2 +
 .../runtime/yarn/AbstractFlinkYarnCluster.java  |   2 +
 .../runtime/taskmanager/TaskManagerTest.java    |   1 -
 flink-yarn-tests/pom.xml                        |   6 +
 .../java/org/apache/flink/yarn/UtilsTest.java   |  51 ++
 .../YARNSessionCapacitySchedulerITCase.java     |  16 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       | 293 +++++++++--
 .../org/apache/flink/yarn/YarnTestBase.java     | 141 +++--
 .../src/main/resources/log4j-test.properties    |  15 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  21 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 139 +++--
 .../yarn/appMaster/YarnTaskManagerRunner.java   |   1 +
 .../apache/flink/yarn/ApplicationClient.scala   |   9 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |  19 +-
 .../flink/yarn/ApplicationMasterActor.scala     | 509 +++++++++++++++++++
 .../scala/org/apache/flink/yarn/Messages.scala  |   4 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  | 330 ------------
 .../org/apache/flink/yarn/YarnTaskManager.scala |   7 +-
 tools/travis_mvn_watchdog.sh                    |  14 +
 24 files changed, 1291 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/docs/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/yarn_setup.md b/docs/yarn_setup.md
index cbd6759..5c53f8b 100644
--- a/docs/yarn_setup.md
+++ b/docs/yarn_setup.md
@@ -23,19 +23,30 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-## In a Nutshell
+## Quickstart: Start a long-running Flink cluster on YARN
 
-Start YARN session with 4 Task Managers (each with 4 GB of Heapspace):
+Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
 
 ~~~bash
 wget {{ site.FLINK_WGET_URL_YARN_STABLE }}
-tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2-yarn.tgz
-cd flink-yarn-{{ site.FLINK_VERSION_SHORT }}/
+tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
+cd flink-{{ site.FLINK_VERSION_SHORT }}/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
 ~~~
 
 Specify the `-s` flag for the number of processing slots per Task Manager. We 
recommend to set the number of slots to the number of processors per machine.
 
+Once the session has been started, you can submit jobs to the cluster using 
the `./bin/flink` tool.
+
+## Quickstart: Run a Flink job on YARN
+
+~~~bash
+wget {{ site.FLINK_WGET_URL_YARN_STABLE }}
+tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
+cd flink-{{ site.FLINK_VERSION_SHORT }}/
+./bin/flink -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT }}-WordCount.jar
+~~~
+
 ## Apache Flink on Hadoop YARN using a YARN Session
 
 Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource 
management framework. It allows to run various distributed applications on top 
of a cluster. Flink runs on YARN next to other applications. Users do not have 
to setup or install anything if there is already a YARN setup.
@@ -60,11 +71,11 @@ Download the YARN tgz package on the [download 
page]({{site.baseurl}}/downloads.
 Extract the package using:
 
 ~~~bash
-tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2-yarn.tgz
-cd flink-yarn-{{site.FLINK_VERSION_SHORT }}/
+tar xvzf flink-{{ site.FLINK_VERSION_SHORT }}-bin-hadoop2.tgz
+cd flink-{{site.FLINK_VERSION_SHORT }}/
 ~~~
 
-If you want to build the YARN .tgz file from sources, follow the [build 
instructions](building.html). You can find the result of the build in 
`flink-dist/target/flink-{{ site.FLINK_VERSION_SHORT }}-bin/flink-yarn-{{ 
site.FLINK_VERSION_SHORT }}/` (*Note: The version might be different for you* ).
+If you want to build the YARN .tgz file from sources, follow the [build 
instructions](building.html). You can find the result of the build in 
`flink-dist/target/flink-{{ site.FLINK_VERSION_SHORT }}-bin/flink-{{ 
site.FLINK_VERSION_SHORT }}/` (*Note: The version might be different for you* ).
 
 
 #### Start a Session
@@ -83,6 +94,7 @@ Usage:
      -n,--container <arg>   Number of YARN container to allocate (=Number of 
Task Managers)
    Optional
      -D <arg>                        Dynamic properties
+     -d,--detached                   Start detached
      -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
      -q,--query                      Display available YARN resources (memory, 
cores)
      -qu,--queue <arg>               Specify YARN queue.
@@ -102,18 +114,23 @@ The system will use the configuration in 
`conf/flink-config.yaml`. Please follow
 
 Flink on YARN will overwrite the following configuration parameters 
`jobmanager.rpc.address` (because the JobManager is always allocated at 
different machines), `taskmanager.tmp.dirs` (we are using the tmp directories 
given by YARN) and `parallelization.degree.default` if the number of slots has 
been specified.
 
-If you don't want to change the configuration file to pass configuration 
parameters, there is the option to pass dynamic properties via the `-D` flag. 
So you can pass parameters this way: `-Dfs.overwrite-files=true 
-Dtaskmanager.network.numberOfBuffers=16368`.
+If you don't want to change the configuration file to set configuration 
parameters, there is the option to pass dynamic properties via the `-D` flag. 
So you can pass parameters this way: `-Dfs.overwrite-files=true 
-Dtaskmanager.network.numberOfBuffers=16368`.
 
 The example invocation starts 11 containers, since there is one additional 
container for the ApplicationMaster and Job Manager.
 
 Once Flink is deployed in your YARN cluster, it will show you the connection 
details of the Job Manager.
 
-The client has to remain open to keep the deployment running. We suggest to 
use `screen`, which will start a detachable shell:
+Stop the YARN session by stopping the unix process (using CTRL+C) or by 
entering 'stop' into the client.
 
-1. Open `screen`,
-2. Start Flink on YARN,
-3. Use `CTRL+a`, then press `d` to detach the screen session,
-4. Use `screen -r` to resume again.
+#### Detached YARN session
+
+If you do not want to keep the Flink YARN client running all the time, its 
also possible to start a *detached* YARN session. 
+The parameter for that is called `-d` or `--detached`.
+
+In that case, the Flink YARN client will only submit Flink to the cluster and 
then close itself.
+Note that in this case its not possible to stop the YARN session using Flink.
+
+Use the YARN utilities (`yarn application -kill <appId`) to stop the YARN 
session.
 
 
 ### Submit Job to Flink
@@ -187,6 +204,14 @@ Please note that the client then expects the `-yn` value 
to be set (number of Ta
 The command line options of the YARN session are also available with the 
`./bin/flink` tool. They are prefixed with a `y` or `yarn` (for the long 
argument options).
 
 
+## Recovery behavior of Flink on YARN
+
+Flink's YARN client has the following configuration parameters to control how 
to behave in case of container failures. These parameters can be set either 
from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` 
parameters.
+
+- `yarn.reallocate-failed`: This parameter controls whether Flink should 
reallocate failed TaskManager containers. Default: true
+- `yarn.maximum-failed-containers`: The maximum number of failed containers 
the ApplicationMaster accepts until it fails the YARN session. Default: The 
number of initally requested TaskManagers (`-n`).
+- `yarn.application-attempts`: The number of ApplicationMaster (+ its 
TaskManager containers) attempts. If this value is set to 1 (default), the 
entire YARN session will fail when the Application master fails. Higher values 
specify the number of restarts of the ApplicationMaster by YARN.
+
 
 ## Debugging a failed YARN session
 

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 8d5a3c5..8e632f1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -58,7 +58,6 @@ public class FlinkYarnSessionCli {
        public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
        public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
 
-
        private static final int CLIENT_POLLING_INTERVALL = 3;
 
 
@@ -73,6 +72,7 @@ public class FlinkYarnSessionCli {
        private final Option TM_MEMORY;
        private final Option CONTAINER;
        private final Option SLOTS;
+       private final Option DETACHED;
 
        /**
         * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
@@ -80,7 +80,9 @@ public class FlinkYarnSessionCli {
         */
        private final Option DYNAMIC_PROPERTIES;
 
+       //------------------------------------ Internal fields 
-------------------------
        private AbstractFlinkYarnCluster yarnCluster = null;
+       private boolean detachedMode = false;
 
        public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
                QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
@@ -92,6 +94,7 @@ public class FlinkYarnSessionCli {
                CONTAINER = new Option(shortPrefix + "n", longPrefix + 
"container", true, "Number of YARN container to allocate (=Number of Task 
Managers)");
                SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", 
true, "Number of slots per TaskManager");
                DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, 
"Dynamic properties");
+               DETACHED = new Option(shortPrefix + "d", longPrefix + 
"detached", false, "Start detached");
        }
 
        public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
@@ -212,6 +215,10 @@ public class FlinkYarnSessionCli {
 
                
flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
+               if(cmd.hasOption(DETACHED.getOpt())) {
+                       detachedMode = true;
+                       flinkYarnClient.setDetachedMode(detachedMode);
+               }
                return flinkYarnClient;
        }
 
@@ -234,6 +241,7 @@ public class FlinkYarnSessionCli {
                opt.addOption(QUEUE);
                opt.addOption(SLOTS);
                opt.addOption(DYNAMIC_PROPERTIES);
+               opt.addOption(DETACHED);
                formatter.printHelp(" ", opt);
        }
 
@@ -289,6 +297,7 @@ public class FlinkYarnSessionCli {
 
                                if(yarnCluster.hasFailed()) {
                                        System.err.println("The YARN cluster 
has failed");
+                                       yarnCluster.shutdown();
                                }
 
                                // wait until CLIENT_POLLING_INTERVALL is over 
or the user entered something.
@@ -335,6 +344,7 @@ public class FlinkYarnSessionCli {
                options.addOption(SHIP_PATH);
                options.addOption(SLOTS);
                options.addOption(DYNAMIC_PROPERTIES);
+               options.addOption(DETACHED);
        }
 
        public int run(String[] args) {
@@ -405,17 +415,25 @@ public class FlinkYarnSessionCli {
 
                        //------------------ Cluster running, let user control 
it ------------
 
-                       runInteractiveCli(yarnCluster);
+                       if(detachedMode) {
+                               // print info and quit:
+                               LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop" +
+                                               "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
+                                               "yarn application -kill 
"+yarnCluster.getApplicationId()+"\n" +
+                                               "Please also note that the 
temporary files of the YARN session in {} will not be removed.", 
flinkYarnClient.getSessionFilesDir());
+                       } else {
+                               runInteractiveCli(yarnCluster);
 
-                       if(!yarnCluster.hasBeenStopped()) {
-                               LOG.info("Command Line Interface requested 
session shutdown");
-                               yarnCluster.shutdown();
-                       }
+                               if (!yarnCluster.hasBeenStopped()) {
+                                       LOG.info("Command Line Interface 
requested session shutdown");
+                                       yarnCluster.shutdown();
+                               }
 
-                       try {
-                               yarnPropertiesFile.delete();
-                       } catch (Exception e) {
-                               LOG.warn("Exception while deleting the 
JobManager address file", e);
+                               try {
+                                       yarnPropertiesFile.delete();
+                               } catch (Exception e) {
+                                       LOG.warn("Exception while deleting the 
JobManager address file", e);
+                               }
                        }
                }
                return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 028c258..dd920d7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -188,6 +188,35 @@ public final class ConfigConstants {
         */
        public static final String YARN_HEAP_LIMIT_CAP = "yarn.heap-limit-cap";
 
+       /**
+        * Reallocate failed YARN containers.
+        */
+       public static final String YARN_REALLOCATE_FAILED_CONTAINERS = 
"yarn.reallocate-failed";
+
+       /**
+        * The maximum number of failed YARN containers before entirely stopping
+        * the YARN session / job on YARN.
+        *
+        * By default, we take the number of of initially requested containers.
+        */
+       public static final String YARN_MAX_FAILED_CONTAINERS = 
"yarn.maximum-failed-containers";
+
+       /**
+        * Set the number of retries for failed YARN 
ApplicationMasters/JobManagers.
+        * This value is usually limited by YARN.
+        *
+        * By default, its 1.
+        */
+       public static final String YARN_APPLICATION_ATTEMPTS = 
"yarn.application-attempts";
+
+       /**
+        * The heartbeat intervall between the Application Master and the YARN 
Resource Manager.
+        *
+        * The default value is 5 (seconds).
+        */
+       public static final String YARN_HEARTBEAT_DELAY_SECONDS = 
"yarn.heartbeat-delay";
+
+
        // ------------------------ Hadoop Configuration 
------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 287a273..8d79c45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -26,6 +26,7 @@ import java.net.URL;
 
 import akka.actor.ActorRef;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,9 +65,9 @@ public class WebInfoServer {
        private final Server server;
 
        /**
-        * Port for info server
+        * The assigned port where jetty is running.
         */
-       private final int port;
+       private int assignedPort;
 
        /**
         * Creates a new web info server. The server runs the servlets that 
implement the logic
@@ -87,10 +88,11 @@ public class WebInfoServer {
                        throw new NullPointerException();
                }
 
-               this.port = 
config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+               // if port == 0, jetty will assign an available port.
+               int port = 
config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
                                
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
-               if (this.port <= 0) {
-                       throw new IllegalArgumentException("Invalid port for 
the webserver: " + this.port);
+               if (port < 0) {
+                       throw new IllegalArgumentException("Invalid port for 
the webserver: " + port);
                }
 
                final FiniteDuration timeout = AkkaUtils.getTimeout(config);
@@ -190,7 +192,13 @@ public class WebInfoServer {
         */
        public void start() throws Exception {
                server.start();
-               LOG.info("Started web info server for JobManager on 
{}:{}",server.getConnectors()[0].getHost(), this.port);
+               final Connector connector = server.getConnectors()[0];
+               assignedPort = connector.getLocalPort(); // we have to use 
getLocalPort() instead of getPort() 
http://stackoverflow.com/questions/8884865/how-to-discover-jetty-7-running-port
+               String host = connector.getHost();
+               if(host == null) { // as per method documentation
+                       host = "0.0.0.0";
+               }
+               LOG.info("Started web info server for JobManager on {}:{}", 
host, assignedPort);
        }
 
        /**
@@ -198,9 +206,10 @@ public class WebInfoServer {
         */
        public void stop() throws Exception {
                server.stop();
+               assignedPort = 0;
        }
 
        public int getServerPort() {
-               return this.port;
+               return this.assignedPort;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
new file mode 100644
index 0000000..546e142
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.util;
+
+import org.slf4j.Logger;
+import sun.misc.Signal;
+
+/**
+ * This signal handler / signal logger is based on Apache Hadoops 
org.apache.hadoop.util.SignalLogger.
+ */
+public class SignalHandler {
+       private static boolean registered = false;
+
+       /**
+        * Our signal handler.
+        */
+       private static class Handler implements sun.misc.SignalHandler {
+               final private Logger LOG;
+               final private sun.misc.SignalHandler prevHandler;
+
+               Handler(String name, Logger LOG) {
+                       this.LOG = LOG;
+                       prevHandler = Signal.handle(new Signal(name), this);
+               }
+
+               /**
+                * Handle an incoming signal.
+                *
+                * @param signal    The incoming signal
+                */
+               @Override
+               public void handle(Signal signal) {
+                       LOG.error("RECEIVED SIGNAL " + signal.getNumber() + ": 
SIG" + signal.getName());
+                       prevHandler.handle(signal);
+               }
+       }
+
+       /**
+        * Register some signal handlers.
+        *
+        * @param LOG The slf4j logger
+        */
+       public static void register(final Logger LOG) {
+               if (registered) {
+                       throw new IllegalStateException("Can't re-install the 
signal handlers.");
+               }
+               registered = true;
+               StringBuilder bld = new StringBuilder();
+               bld.append("registered UNIX signal handlers for [");
+               final String[] SIGNALS = { "TERM", "HUP", "INT" };
+               String separator = "";
+               for (String signalName : SIGNALS) {
+                       try {
+                               new Handler(signalName, LOG);
+                               bld.append(separator);
+                               bld.append(signalName);
+                               separator = ", ";
+                       } catch (Exception e) {
+                               LOG.debug("Error while registering signal 
handler", e);
+                       }
+               }
+               bld.append("]");
+               LOG.info(bld.toString());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index 77f4301..e8b730d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -49,5 +49,7 @@ public abstract class AbstractFlinkYarnClient {
 
        public abstract AbstractFlinkYarnCluster deploy(String clusterName) 
throws Exception;
 
+       public abstract void setDetachedMode(boolean detachedMode);
 
+       public abstract String getSessionFilesDir();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
index 58eaf1d..4b4bd2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -41,4 +41,6 @@ public abstract class AbstractFlinkYarnCluster {
        public abstract String getDiagnostics();
 
        public abstract List<String> getNewMessages();
+
+       public abstract String getApplicationId();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d6724ee..dbc6e9d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -28,7 +28,6 @@ import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.blob.BlobKey;
 import 
org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index d93a82d..8ab6359 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -82,6 +82,12 @@ under the License.
                        <type>jar</type>
                </dependency>
 
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 7a4631d..25a1413 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -17,14 +17,21 @@
  */
 package org.apache.flink.yarn;
 
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 public class UtilsTest {
+       private static final Logger LOG = 
LoggerFactory.getLogger(UtilsTest.class);
 
        @Test
        public void testUberjarLocator() {
@@ -39,4 +46,48 @@ public class UtilsTest {
                Assert.assertTrue(files.contains("bin"));
                Assert.assertTrue(files.contains("conf"));
        }
+
+
+       //
+       // --------------- Tools to test if a certain string has been logged 
with Log4j. -------------
+       // See :  
http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
+       //
+       private static TestAppender testAppender;
+       public static void addTestAppender(Class target, Level level) {
+               testAppender = new TestAppender();
+               testAppender.setThreshold(level);
+               org.apache.log4j.Logger lg = 
org.apache.log4j.Logger.getLogger(target);
+               lg.setLevel(level);
+               lg.addAppender(testAppender);
+               
//org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
+       }
+
+       public static void checkForLogString(String expected) {
+               if(testAppender == null) {
+                       throw new NullPointerException("Initialize it first");
+               }
+               LoggingEvent found = null;
+               for(LoggingEvent event: testAppender.events) {
+                       if(event.getMessage().toString().contains(expected)) {
+                               found = event;
+                               break;
+                       }
+               }
+               if(found != null) {
+                       LOG.info("Found expected string '"+expected+"' in log 
message "+found);
+                       return;
+               }
+               Assert.fail("Unable to find expected string '" + expected + "' 
in log messages");
+       }
+
+       public static class TestAppender extends AppenderSkeleton {
+               public List<LoggingEvent> events = new 
ArrayList<LoggingEvent>();
+               public void close() {}
+               public boolean requiresLayout() {return false;}
+               @Override
+               protected void append(LoggingEvent event) {
+                       events.add(event);
+               }
+       }
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 7da355b..4179549 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -20,13 +20,12 @@ package org.apache.flink.yarn;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.log4j.Level;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import static org.apache.flink.yarn.YARNSessionFIFOITCase.addTestAppender;
-import static org.apache.flink.yarn.YARNSessionFIFOITCase.checkForLogString;
+import static org.apache.flink.yarn.UtilsTest.addTestAppender;
+import static org.apache.flink.yarn.UtilsTest.checkForLogString;
 
 
 /**
@@ -34,7 +33,7 @@ import static 
org.apache.flink.yarn.YARNSessionFIFOITCase.checkForLogString;
  * Is has, by default a queue called "default". The configuration here adds 
another queue: "qa-team".
  */
 public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
-       private static final Logger LOG = 
LoggerFactory.getLogger(YARNSessionCapacitySchedulerITCase.class);
+
 
        @BeforeClass
        public static void setup() {
@@ -42,6 +41,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                yarnConfiguration.set("yarn.scheduler.capacity.root.queues", 
"default,qa-team");
                
yarnConfiguration.setInt("yarn.scheduler.capacity.root.default.capacity", 40);
                
yarnConfiguration.setInt("yarn.scheduler.capacity.root.qa-team.capacity", 60);
+               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-capacityscheduler");
                startYARNWithConfig(yarnConfiguration);
        }
 
@@ -56,7 +56,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                                "-tm", "1024", "-qu", 
"qa-team"},
                                "Number of connected TaskManagers changed to 1. 
Slots available: 1", RunTypes.YARN_SESSION);
 
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
 
@@ -66,7 +66,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
         */
        @Test
        public void testNonexistingQueue() {
-               addTestAppender();
+               addTestAppender(FlinkYarnClient.class, Level.WARN);
                runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
                                "-n", "1",
                                "-jm", "512",
@@ -74,6 +74,6 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-qu", "doesntExist"}, "to unknown queue: 
doesntExist", RunTypes.YARN_SESSION);
                checkForLogString("The specified queue 'doesntExist' does not 
exist. Available queues: default, qa-team");
 
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 5976799..dcbd80a 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -17,17 +17,30 @@
  */
 package org.apache.flink.yarn;
 
+import com.google.common.base.Joiner;
+import org.apache.commons.io.IOUtils;
 import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
+import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.log4j.AppenderSkeleton;
-import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.Level;
+import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,8 +48,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.ArrayList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.flink.yarn.UtilsTest.addTestAppender;
+import static org.apache.flink.yarn.UtilsTest.checkForLogString;
 
 
 /**
@@ -54,23 +77,204 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                yarnConfiguration.setClass(YarnConfiguration.RM_SCHEDULER, 
FifoScheduler.class, ResourceScheduler.class);
                yarnConfiguration.setInt(YarnConfiguration.NM_PMEM_MB, 768);
                
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 
512);
+               yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, 
"flink-yarn-tests-fifo");
                startYARNWithConfig(yarnConfiguration);
        }
+
        /**
         * Test regular operation, including command line parameter parsing.
         */
        @Test
        public void testClientStartup() {
                LOG.info("Starting testClientStartup()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
                                                "-n", "1",
                                                "-jm", "512",
                                                "-tm", "1024"},
                                "Number of connected TaskManagers changed to 1. 
Slots available: 1", RunTypes.YARN_SESSION);
                LOG.info("Finished testClientStartup()");
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
+       }
+
+       /**
+        * Test regular operation, including command line parameter parsing.
+        */
+       @Test(timeout=60000) // timeout after a minute.
+       public void testDetachedMode() {
+               LOG.info("Starting testDetachedMode()");
+               addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
+               Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(),
+                                               "-n", "1",
+                                               "-jm", "512",
+                                               "-tm", "1024",
+                                               "--detached"},
+                               "Flink JobManager is now running on", 
RunTypes.YARN_SESSION);
+
+               checkForLogString("The Flink YARN client has been started in 
detached mode");
+
+               Assert.assertFalse("The runner should detach.", 
runner.isAlive());
+
+               // kill application "externally".
+               try {
+                       YarnClient yc = YarnClient.createYarnClient();
+                       yc.init(yarnConfiguration);
+                       yc.start();
+                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+                       Assert.assertEquals(1, apps.size()); // Only one running
+                       ApplicationId id = apps.get(0).getApplicationId();
+                       yc.killApplication(id);
+
+                       
while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
+                               sleep(500);
+                       }
+               } catch(Throwable t) {
+                       LOG.warn("Killing failed", t);
+                       Assert.fail();
+               }
+
+               LOG.info("Finished testDetachedMode()");
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
+       /**
+        * Test TaskManager failure
+        */
+       @Test(timeout=100000) // timeout after 100 seconds
+       public void testTaskManagerFailure() {
+               LOG.info("Starting testTaskManagerFailure()");
+               Runner runner = startWithArgs(new String[]{"-j", 
flinkUberjar.getAbsolutePath(),
+                               "-n", "1",
+                               "-jm", "512",
+                               "-tm", "1024",
+                               "-Dfancy-configuration-value=veryFancy",
+                               "-Dyarn.maximum-failed-containers=3"},
+                               "Number of connected TaskManagers changed to 1. 
Slots available: 1",
+                               RunTypes.YARN_SESSION);
+
+               Assert.assertEquals(2, getRunningContainers());
+
+               // ------------------------ Test if JobManager web interface is 
accessible -------
+               try {
+                       YarnClient yc = YarnClient.createYarnClient();
+                       yc.init(yarnConfiguration);
+                       yc.start();
+                       List<ApplicationReport> apps = 
yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
+                       Assert.assertEquals(1, apps.size()); // Only one running
+                       String url = apps.get(0).getTrackingUrl();
+                       if(!url.endsWith("/")) {
+                               url += "/";
+                       }
+                       if(!url.startsWith("http://";)) {
+                               url = "http://"; + url;
+                       }
+                       LOG.info("Got application URL from YARN {}", url);
+
+                       // get number of TaskManagers:
+                       Assert.assertEquals("{\"taskmanagers\": 1, \"slots\": 
1}", getFromHTTP(url + "jobsInfo?get=taskmanagers"));
+
+                       // get the configuration from webinterface & check if 
the dynamic properties from YARN show up there.
+                       String config = getFromHTTP(url + 
"setupInfo?get=globalC");
+                       JSONObject parsed = new JSONObject(config);
+                       Assert.assertEquals("veryFancy", 
parsed.getString("fancy-configuration-value"));
+                       Assert.assertEquals("3", 
parsed.getString("yarn.maximum-failed-containers"));
+
+                       // test logfile access
+                       String logs = getFromHTTP(url + "logInfo");
+                       Assert.assertTrue(logs.contains("Starting YARN 
ApplicationMaster/JobManager (Version"));
+               } catch(Throwable e) {
+                       LOG.warn("Error while running test",e);
+                       Assert.fail(e.getMessage());
+               }
+
+               // ------------------------ Kill container with TaskManager  
-------
+
+               // find container id of taskManager:
+               ContainerId taskManagerContainer = null;
+               NodeManager nodeManager = null;
+               UserGroupInformation remoteUgi = null;
+               NMTokenIdentifier nmIdent = null;
+               try {
+                       remoteUgi = UserGroupInformation.getCurrentUser();
+               } catch (IOException e) {
+                       LOG.warn("Unable to get curr user", e);
+                       Assert.fail();
+               }
+               for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+                       NodeManager nm = yarnCluster.getNodeManager(nmId);
+                       ConcurrentMap<ContainerId, Container> containers = 
nm.getNMContext().getContainers();
+                       for(Map.Entry<ContainerId, Container> entry : 
containers.entrySet()) {
+                               String command = Joiner.on(" 
").join(entry.getValue().getLaunchContext().getCommands());
+                               
if(command.contains(YarnTaskManagerRunner.class.getSimpleName())) {
+                                       taskManagerContainer = entry.getKey();
+                                       nodeManager = nm;
+                                       nmIdent = new 
NMTokenIdentifier(taskManagerContainer.getApplicationAttemptId(), null, "",0);
+                                       // allow myself to do stuff with the 
container
+                                       // 
remoteUgi.addCredentials(entry.getValue().getCredentials());
+                                       remoteUgi.addTokenIdentifier(nmIdent);
+                               }
+                       }
+                       sleep(500);
+               }
+
+               Assert.assertNotNull("Unable to find container with 
TaskManager", taskManagerContainer);
+               Assert.assertNotNull("Illegal state", nodeManager);
+
+               List<ContainerId> toStop = new LinkedList<ContainerId>();
+               toStop.add(taskManagerContainer);
+               StopContainersRequest scr = 
StopContainersRequest.newInstance(toStop);
+
+               try {
+                       
nodeManager.getNMContext().getContainerManager().stopContainers(scr);
+               } catch (Throwable e) {
+                       LOG.warn("Error stopping container", e);
+                       Assert.fail("Error stopping container: 
"+e.getMessage());
+               }
+
+               // stateful termination check:
+               // wait until we saw a container being killed and AFTERWARDS a 
new one launced
+               boolean ok = false;
+               do {
+                       LOG.debug("Waiting for correct order of events. Output: 
{}", errContent.toString());
+
+                       String o = errContent.toString();
+                       int killedOff = o.indexOf("Container killed by the 
ApplicationMaster");
+                       if(killedOff != -1) {
+                               o = o.substring(killedOff);
+                               ok = o.indexOf("Launching container") > 0;
+                       }
+                       sleep(1000);
+               } while(!ok);
+
+
+               // send "stop" command to command line interface
+               runner.sendStop();
+               // wait for the thread to stop
+               try {
+                       runner.join(1000);
+               } catch (InterruptedException e) {
+                       LOG.warn("Interrupted while stopping runner", e);
+               }
+               LOG.warn("stopped");
+
+               // ----------- Send output to logger
+               System.setOut(originalStdout);
+               System.setErr(originalStderr);
+               String oC = outContent.toString();
+               String eC = errContent.toString();
+               LOG.info("Sending stdout content through logger: \n\n{}\n\n", 
oC);
+               LOG.info("Sending stderr content through logger: \n\n{}\n\n", 
eC);
+
+               // ------ Check if everything happened correctly
+               Assert.assertTrue("Expect to see failed container", 
eC.contains("New messages from the YARN cluster"));
+               Assert.assertTrue("Expect to see failed container", 
eC.contains("Container killed by the ApplicationMaster"));
+               Assert.assertTrue("Expect to see new container started", 
eC.contains("Launching container") && eC.contains("on host"));
+
+               // cleanup auth for the subsequent tests.
+               remoteUgi.getTokenIdentifiers().remove(nmIdent);
+
+               LOG.info("Finished testTaskManagerFailure()");
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
+       }
 
        /**
         * Test querying the YARN cluster.
@@ -82,7 +286,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                LOG.info("Starting testQueryCluster()");
                runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 
totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
                LOG.info("Finished testQueryCluster()");
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
        /**
@@ -98,7 +302,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                                "-tm", "1024",
                                "-qu", "doesntExist"}, "Number of connected 
TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
                LOG.info("Finished testNonexistingQueue()");
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
        /**
@@ -109,15 +313,15 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                if(ignoreOnTravis()) {
                        return;
                }
-               addTestAppender();
+               addTestAppender(FlinkYarnClient.class, Level.WARN);
                LOG.info("Starting testMoreNodesThanAvailable()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
                                "-n", "10",
                                "-jm", "512",
                                "-tm", "1024"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends 
on the speed of the test hardware
                LOG.info("Finished testMoreNodesThanAvailable()");
                checkForLogString("This YARN session requires 10752MB of memory 
in the cluster. There are currently only 8192MB available.");
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
        /**
@@ -137,9 +341,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                if(ignoreOnTravis()) {
                        return;
                }
-               addTestAppender();
+               addTestAppender(FlinkYarnClient.class, Level.WARN);
                LOG.info("Starting testResourceComputation()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
                                "-n", "5",
                                "-jm", "256",
                                "-tm", "1585"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION);
@@ -167,16 +371,16 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                if(ignoreOnTravis()) {
                        return;
                }
-               addTestAppender();
+               addTestAppender(FlinkYarnClient.class, Level.WARN);
                LOG.info("Starting testfullAlloc()");
-               runWithArgs(new String[] {"-j", flinkUberjar.getAbsolutePath(),
+               runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
                                "-n", "2",
                                "-jm", "256",
                                "-tm", "3840"}, "Number of connected 
TaskManagers changed to", RunTypes.YARN_SESSION);
                LOG.info("Finished testfullAlloc()");
                checkForLogString("There is not enough memory available in the 
YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: 
[4096, 4096]\n" +
                                "After allocating the JobManager (512MB) and 
(1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
        /**
@@ -195,7 +399,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                                "-yjm", "512",
                                "-ytm", "1024", 
exampleJarLocation.getAbsolutePath()}, "Job execution switched to status 
FINISHED.", RunTypes.CLI_FRONTEND);
                LOG.info("Finished perJobYarnCluster()");
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
        /**
@@ -222,7 +426,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                        yarnCluster = flinkYarnClient.deploy(null);
                } catch (Exception e) {
                        System.err.println("Error while deploying YARN cluster: 
"+e.getMessage());
-                       e.printStackTrace(System.err);
+                       LOG.warn("Failing test", e);
                        Assert.fail();
                }
                FlinkYarnClusterStatus expectedStatus = new 
FlinkYarnClusterStatus(1, 1);
@@ -253,7 +457,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                yarnCluster.shutdown();
                LOG.info("Finished testJavaAPI()");
 
-               ensureNoExceptionsInLogFiles();
+               ensureNoProhibitedStringInLogFiles(prohibtedStrings);
        }
 
        public boolean ignoreOnTravis() {
@@ -266,42 +470,29 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                return false;
        }
 
-       //
-       // --------------- Tools to test if a certain string has been logged 
with Log4j. -------------
-       // See :  
http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
-       //
-       private static TestAppender testAppender;
-       public static void addTestAppender() {
-               testAppender = new TestAppender();
-               
org.apache.log4j.Logger.getRootLogger().addAppender(testAppender);
-       }
 
-       public static void checkForLogString(String expected) {
-               if(testAppender == null) {
-                       throw new NullPointerException("Initialize it first");
-               }
-               LoggingEvent found = null;
-               for(LoggingEvent event: testAppender.events) {
-                       if(event.getMessage().toString().contains(expected)) {
-                               found = event;
-                               break;
-                       }
-               }
-               if(found != null) {
-                       LOG.info("Found expected string '"+expected+"' in log 
message "+found);
-                       return;
-               }
-               Assert.fail("Unable to find expected string '"+expected+"' in 
log messages");
-       }
+       ///------------------------ Ported tool form: 
https://github.com/rmetzger/flink/blob/flink1501/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
 
-       public static class TestAppender extends AppenderSkeleton {
-               public List<LoggingEvent> events = new 
ArrayList<LoggingEvent>();
-               public void close() {}
-               public boolean requiresLayout() {return false;}
-               @Override
-               protected void append(LoggingEvent event) {
-                       events.add(event);
+       public static String getFromHTTP(String url) throws Exception{
+               URL u = new URL(url);
+               LOG.info("Accessing URL "+url+" as URL: "+u);
+               HttpURLConnection connection = (HttpURLConnection) 
u.openConnection();
+               connection.setConnectTimeout(100000);
+               connection.connect();
+               InputStream is = null;
+               if(connection.getResponseCode() >= 400) {
+                       // error!
+                       LOG.warn("HTTP Response code when connecting to {} was 
{}", url, connection.getResponseCode());
+                       is = connection.getErrorStream();
+               } else {
+                       is = connection.getInputStream();
                }
+
+               return IOUtils.toString(is, connection.getContentEncoding() != 
null ? connection.getContentEncoding() : "UTF-8");
        }
+
+
+
+
        
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index 200205d..88b4772 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -23,11 +23,14 @@ import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -51,6 +54,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
+import java.util.concurrent.ConcurrentMap;
 
 
 /**
@@ -59,15 +63,24 @@ import java.util.Scanner;
  *
  * This class is located in a different package which is build after 
flink-dist. This way,
  * we can use the YARN uberjar of flink to start a Flink YARN session.
+ *
+ * The test is not threadsafe. Parallel execution of tests is not possible!
  */
 public abstract class YarnTestBase {
        private static final Logger LOG = 
LoggerFactory.getLogger(YarnTestBase.class);
 
-       private final static PrintStream originalStdout = System.out;
-       private final static PrintStream originalStderr = System.err;
+       protected final static PrintStream originalStdout = System.out;
+       protected final static PrintStream originalStderr = System.err;
+
+       protected static String TEST_CLUSTER_NAME_KEY = 
"flink-yarn-minicluster-name";
 
-       private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";
+       protected final static int NUM_NODEMANAGERS = 2;
 
+       // The tests are scanning for these strings in the final output.
+       protected final static String[] prohibtedStrings = {
+                       "Exception", // we don't want any exceptions to happen
+                       "Started SelectChannelConnector@0.0.0.0:8081" // Jetty 
should start on a random port in YARN mode.
+       };
 
        // Temp directory which is deleted after the unit test.
        private static TemporaryFolder tmp = new TemporaryFolder();
@@ -87,9 +100,10 @@ public abstract class YarnTestBase {
                yarnConfiguration.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
2);
                
yarnConfiguration.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
                
yarnConfiguration.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
-               
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+               
yarnConfiguration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
                yarnConfiguration.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
                // so we have to change the number of cores for testing.
+               
yarnConfiguration.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); // 
10 seconds expiry (to ensure we properly heartbeat with YARN).
        }
 
        // This code is taken from: http://stackoverflow.com/a/7201825/568695
@@ -147,14 +161,17 @@ public abstract class YarnTestBase {
                yarnClient.start();
                List<ApplicationReport> apps = yarnClient.getApplications();
                for(ApplicationReport app : apps) {
-                       if(app.getYarnApplicationState() != 
YarnApplicationState.FINISHED) {
-                               Assert.fail("There is at least one application 
on the cluster is not finished");
+                       if(app.getYarnApplicationState() != 
YarnApplicationState.FINISHED
+                                       && app.getYarnApplicationState() != 
YarnApplicationState.KILLED
+                                       && app.getYarnApplicationState() != 
YarnApplicationState.FAILED) {
+                               Assert.fail("There is at least one application 
on the cluster is not finished." +
+                                               "App "+app.getApplicationId()+" 
is in state "+app.getYarnApplicationState());
                        }
                }
        }
 
        /**
-        * Locate a file or diretory directory
+        * Locate a file or directory
         */
        public static File findFile(String startAt, FilenameFilter fnf) {
                File root = new File(startAt);
@@ -163,7 +180,6 @@ public abstract class YarnTestBase {
                        return null;
                }
                for(String file : files) {
-
                        File f = new File(startAt + File.separator + file);
                        if(f.isDirectory()) {
                                File r = findFile(f.getAbsolutePath(), fnf);
@@ -173,7 +189,6 @@ public abstract class YarnTestBase {
                        } else if (fnf.accept(f.getParentFile(), f.getName())) {
                                return f;
                        }
-
                }
                return null;
        }
@@ -224,12 +239,15 @@ public abstract class YarnTestBase {
        /**
         * This method checks the written TaskManager and JobManager log files
         * for exceptions.
+        *
+        * WARN: Please make sure the tool doesn't find old logfiles from 
previous test runs.
+        * So always run "mvn clean" before running the tests here.
+        *
         */
-       public static void ensureNoExceptionsInLogFiles() {
-               File cwd = new File("target/"+TEST_CLUSTER_NAME);
+       public static void ensureNoProhibitedStringInLogFiles(final String[] 
prohibited) {
+               File cwd = new 
File("target/"+yarnConfiguration.get(TEST_CLUSTER_NAME_KEY));
                Assert.assertTrue("Expecting directory 
"+cwd.getAbsolutePath()+" to exist", cwd.exists());
                Assert.assertTrue("Expecting directory 
"+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
-               System.out.println("cwd = "+cwd.getAbsolutePath());
                File foundFile = findFile(cwd.getAbsolutePath(), new 
FilenameFilter() {
                        @Override
                        public boolean accept(File dir, String name) {
@@ -243,9 +261,13 @@ public abstract class YarnTestBase {
                                }
                                while (scanner.hasNextLine()) {
                                        final String lineFromFile = 
scanner.nextLine();
-                                       if(lineFromFile.contains("Exception")) {
-                                               return true;
+                                       for(int i = 0; i < prohibited.length; 
i++) {
+                                               
if(lineFromFile.contains(prohibited[i])) {
+                                                       LOG.warn("Prohibited 
String '{}' in line '{}'", prohibited[i], lineFromFile);
+                                                       return true;
+                                               }
                                        }
+
                                }
                                return false;
                        }
@@ -257,12 +279,30 @@ public abstract class YarnTestBase {
                        } catch (FileNotFoundException e) {
                                Assert.fail("Unable to locate file: 
"+e.getMessage()+" file: "+foundFile.getAbsolutePath());
                        }
-                       LOG.warn("Found a file with an exception. Printing 
contents:");
+                       LOG.warn("Found a file with a prohibited string. 
Printing contents:");
                        while (scanner.hasNextLine()) {
                                LOG.warn("LINE: "+scanner.nextLine());
                        }
-                       Assert.fail("Found a file "+foundFile+" with an 
exception");
+                       Assert.fail("Found a file "+foundFile+" with a 
prohibited string: "+Arrays.toString(prohibited));
+               }
+       }
+
+       public static void sleep(int time) {
+               try {
+                       Thread.sleep(time);
+               } catch (InterruptedException e) {
+                       LOG.warn("Interruped",e);
+               }
+       }
+
+       public static int getRunningContainers() {
+               int count = 0;
+               for(int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+                       NodeManager nm = yarnCluster.getNodeManager(nmId);
+                       ConcurrentMap<ContainerId, Container> containers = 
nm.getNMContext().getContainers();
+                       count += containers.size();
                }
+               return count;
        }
 
        public static void startYARNWithConfig(Configuration conf) {
@@ -275,9 +315,9 @@ public abstract class YarnTestBase {
                }
 
                try {
-                       LOG.info("Starting up MiniYARN cluster");
+                       LOG.info("Starting up MiniYARNCluster");
                        if (yarnCluster == null) {
-                               yarnCluster = new 
MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);
+                               yarnCluster = new 
MiniYARNCluster(conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY), NUM_NODEMANAGERS, 
1, 1);
 
                                yarnCluster.init(conf);
                                yarnCluster.start();
@@ -310,13 +350,19 @@ public abstract class YarnTestBase {
 
        // -------------------------- Runner -------------------------- //
 
-       private static ByteArrayOutputStream outContent;
-       private static ByteArrayOutputStream errContent;
+       protected static ByteArrayOutputStream outContent;
+       protected static ByteArrayOutputStream errContent;
        enum RunTypes {
                YARN_SESSION, CLI_FRONTEND
        }
 
-       protected void runWithArgs(String[] args, String expect, RunTypes type) 
{
+       /**
+        * This method returns once the "startedAfterString" has been seen.
+        * @param args
+        * @param startedAfterString
+        * @param type
+        */
+       protected Runner startWithArgs(String[] args, String 
startedAfterString, RunTypes type) {
                LOG.info("Running with args {}", Arrays.toString(args));
 
                outContent = new ByteArrayOutputStream();
@@ -330,16 +376,53 @@ public abstract class YarnTestBase {
                Runner runner = new Runner(args, type);
                runner.start();
 
-               boolean expectedStringSeen = false;
                for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
-                       try {
-                               Thread.sleep(1000);
-                       } catch (InterruptedException e) {
-                               Assert.fail("Interruption not expected");
+                       sleep(1000);
+                       // check output for correct TaskManager startup.
+                       if(outContent.toString().contains(startedAfterString)
+                                       || 
errContent.toString().contains(startedAfterString) ) {
+                               LOG.info("Found expected output in redirected 
streams");
+                               return runner;
+                       }
+                       // check if thread died
+                       if(!runner.isAlive()) {
+                               sendOutput();
+                               Assert.fail("Runner thread died before the test 
was finished. Return value = "+runner.getReturnValue());
                        }
+               }
+
+               sendOutput();
+               Assert.fail("During the timeout period of " + 
START_TIMEOUT_SECONDS + " seconds the " +
+                               "expected string did not show up");
+               return null;
+       }
+
+       /**
+        * The test has been passed once the "terminateAfterString" has been 
seen.
+        * @param args
+        * @param terminateAfterString
+        * @param type
+        */
+       protected void runWithArgs(String[] args, String terminateAfterString, 
RunTypes type) {
+               LOG.info("Running with args {}", Arrays.toString(args));
+
+               outContent = new ByteArrayOutputStream();
+               errContent = new ByteArrayOutputStream();
+               System.setOut(new PrintStream(outContent));
+               System.setErr(new PrintStream(errContent));
+
+
+               final int START_TIMEOUT_SECONDS = 60;
+
+               Runner runner = new Runner(args, type);
+               runner.start();
+
+               boolean expectedStringSeen = false;
+               for(int second = 0; second <  START_TIMEOUT_SECONDS; second++) {
+                       sleep(1000);
                        // check output for correct TaskManager startup.
-                       if(outContent.toString().contains(expect)
-                                       || 
errContent.toString().contains(expect) ) {
+                       if(outContent.toString().contains(terminateAfterString)
+                                       || 
errContent.toString().contains(terminateAfterString) ) {
                                expectedStringSeen = true;
                                LOG.info("Found expected output in redirected 
streams");
                                // send "stop" command to command line interface
@@ -366,7 +449,7 @@ public abstract class YarnTestBase {
                LOG.info("Test was successful");
        }
 
-       private static void sendOutput() {
+       protected static void sendOutput() {
                System.setOut(originalStdout);
                System.setErr(originalStderr);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties 
b/flink-yarn-tests/src/main/resources/log4j-test.properties
index b4dbbe0..3fee7d6 100644
--- a/flink-yarn-tests/src/main/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/main/resources/log4j-test.properties
@@ -16,13 +16,16 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=WARN, file
+log4j.rootLogger=FATAL, console
 
 # Log all infos in the given file
-log4j.appender.file=org.apache.log4j.ConsoleAppender
-log4j.appender.file.append=false
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
+
+# log whats going on between the tests
+log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
+log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, 
console

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 16cb345..077ed11 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -136,6 +136,8 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
        private List<File> shipFiles = new ArrayList<File>();
        private org.apache.flink.configuration.Configuration flinkConfiguration;
 
+       private boolean detached;
+
 
        public FlinkYarnClient() {
                conf = new YarnConfiguration();
@@ -317,6 +319,12 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
                        return deployInternal(clusterName);
                }
        }
+
+       @Override
+       public void setDetachedMode(boolean detachedMode) {
+               this.detached = detachedMode;
+       }
+
        /**
         * This method will block until the ApplicationMaster/JobManager have 
been
         * deployed on YARN.
@@ -494,6 +502,8 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
 
                // Set-up ApplicationSubmissionContext for the application
                ApplicationSubmissionContext appContext = 
yarnApplication.getApplicationSubmissionContext();
+               
appContext.setMaxAppAttempts(flinkConfiguration.getInteger(ConfigConstants.YARN_APPLICATION_ATTEMPTS,
 1));
+
                final ApplicationId appId = appContext.getApplicationId();
 
                // Setup jar for ApplicationMaster
@@ -562,6 +572,9 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
                if(clusterName == null) {
                        clusterName = "Flink session with "+taskManagerCount+" 
TaskManagers";
                }
+               if(detached) {
+                       clusterName += " (detached)";
+               }
 
                appContext.setApplicationName(clusterName); // application name
                appContext.setApplicationType("Apache Flink");
@@ -603,7 +616,7 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
                        Thread.sleep(1000);
                }
                // the Flink cluster is deployed in YARN. Represent cluster
-               return new FlinkYarnCluster(yarnClient, appId, conf, 
flinkConfiguration, sessionFilesDir);
+               return new FlinkYarnCluster(yarnClient, appId, conf, 
flinkConfiguration, sessionFilesDir, detached);
        }
 
        /**
@@ -656,8 +669,6 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
                return new ClusterResourceDescription(totalFreeMemory, 
containerLimit, nodeManagersFree);
        }
 
-
-
        public String getClusterDescription() throws Exception {
 
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -692,6 +703,10 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
                return baos.toString();
        }
 
+       public String getSessionFilesDir() {
+               return sessionFilesDir.toString();
+       }
+
        public static class YarnDeploymentException extends RuntimeException {
                public YarnDeploymentException() {
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 2f652a1..8d6b453 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -56,7 +56,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
+/**
+ * Java representation of a running Flink cluster within YARN.
+ */
 public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCluster.class);
 
@@ -78,15 +80,32 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
        private ApplicationReport intialAppReport;
        private final FiniteDuration akkaDuration;
        private final Timeout akkaTimeout;
-
+       private final ApplicationId applicationId;
+       private final boolean detached;
+
+
+       /**
+        * Create a new Flink on YARN cluster.
+        *
+        * @param yarnClient
+        * @param appId the YARN application ID
+        * @param hadoopConfig
+        * @param flinkConfig
+        * @param sessionFilesDir
+        * @param detached Set to true if no actor system or RPC communication 
with the cluster should be established
+        * @throws IOException
+        * @throws YarnException
+        */
        public FlinkYarnCluster(final YarnClient yarnClient, final 
ApplicationId appId, Configuration hadoopConfig,
                                                        
org.apache.flink.configuration.Configuration flinkConfig,
-                                                       Path sessionFilesDir) 
throws IOException, YarnException {
+                                                       Path sessionFilesDir, 
boolean detached) throws IOException, YarnException {
                this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
                this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
                this.yarnClient = yarnClient;
                this.hadoopConfig = hadoopConfig;
                this.sessionFilesDir = sessionFilesDir;
+               this.applicationId = appId;
+               this.detached = detached;
 
                // get one application report manually
                intialAppReport = yarnClient.getApplicationReport(appId);
@@ -94,27 +113,28 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                int jobManagerPort = intialAppReport.getRpcPort();
                this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
 
-               // start actor system
-               LOG.info("Start actor system.");
-               InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
-               actorSystem = AkkaUtils.createActorSystem(flinkConfig,
-                               new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
+               if(!detached) {
+                       // start actor system
+                       LOG.info("Start actor system.");
+                       InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
+                       actorSystem = AkkaUtils.createActorSystem(flinkConfig,
+                                       new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
 
-               // start application client
-               LOG.info("Start application client.");
+                       // start application client
+                       LOG.info("Start application client.");
 
-               applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class), "applicationClient");
+                       applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class), "applicationClient");
 
-               // instruct ApplicationClient to start a periodical status 
polling
-               applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
+                       // instruct ApplicationClient to start a periodical 
status polling
+                       applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
 
 
-               // add hook to ensure proper shutdown
-               Runtime.getRuntime().addShutdownHook(clientShutdownHook);
+                       // add hook to ensure proper shutdown
+                       
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
-               actorRunner = new Thread(new Runnable() {
-                       @Override
-                       public void run() {
+                       actorRunner = new Thread(new Runnable() {
+                               @Override
+                               public void run() {
                                // blocks until ApplicationMaster has been 
stopped
                                actorSystem.awaitTermination();
 
@@ -126,25 +146,26 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                                                        
.getYarnApplicationState() + " and final state " + appReport
                                                        
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
 
-                                       if(appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
-                                                       == 
YarnApplicationState.KILLED  ) {
-                                               LOG.warn("Application failed. 
Diagnostics "+appReport.getDiagnostics());
+                                       if (appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+                                                       == 
YarnApplicationState.KILLED) {
+                                               LOG.warn("Application failed. 
Diagnostics " + appReport.getDiagnostics());
                                                LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
                                                                + "the full 
application log using this command:\n"
-                                                               + "\tyarn logs 
-applicationId "+appReport.getApplicationId()+"\n"
+                                                               + "\tyarn logs 
-applicationId " + appReport.getApplicationId() + "\n"
                                                                + "(It 
sometimes takes a few seconds until the logs are aggregated)");
                                        }
-                               } catch(Exception e) {
+                               } catch (Exception e) {
                                        LOG.warn("Error while getting final 
application report", e);
                                }
-                       }
-               });
-               actorRunner.setDaemon(true);
-               actorRunner.start();
+                               }
+                       });
+                       actorRunner.setDaemon(true);
+                       actorRunner.start();
 
-               pollingRunner = new PollingThread(yarnClient, appId);
-               pollingRunner.setDaemon(true);
-               pollingRunner.start();
+                       pollingRunner = new PollingThread(yarnClient, appId);
+                       pollingRunner.setDaemon(true);
+                       pollingRunner.start();
+               }
        }
 
        // -------------------------- Interaction with the cluster 
------------------------
@@ -156,12 +177,28 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
 
        @Override
        public String getWebInterfaceURL() {
-               return this.intialAppReport.getTrackingUrl();
+               String url = this.intialAppReport.getTrackingUrl();
+               // there seems to be a difference between HD 2.2.0 and 2.6.0
+               if(!url.startsWith("http://";)) {
+                       url = "http://"; + url;
+               }
+               return url;
        }
 
+       @Override
+       public String getApplicationId() {
+               return applicationId.toString();
+       }
 
+       /**
+        * This method is only available if the cluster hasn't been started in 
detached mode.
+        */
        @Override
        public FlinkYarnClusterStatus getClusterStatus() {
+               if(detached) {
+                       throw new IllegalArgumentException("The cluster has 
been started in detached mode." +
+                                       "Can not request cluster status");
+               }
                if(hasBeenStopped()) {
                        throw new RuntimeException("The FlinkYarnCluster has 
alread been stopped");
                }
@@ -183,6 +220,10 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
 
        @Override
        public boolean hasFailed() {
+               if(detached) {
+                       throw new IllegalArgumentException("The cluster has 
been started in detached mode." +
+                                       "Can not request cluster status");
+               }
                if(pollingRunner == null) {
                        LOG.warn("FlinkYarnCluster.hasFailed() has been called 
on an uninitialized cluster." +
                                        "The system might be in an erroneous 
state");
@@ -193,13 +234,24 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                                        "The system might be in an erroneous 
state");
                        return false;
                } else {
-                       return (lastReport.getYarnApplicationState() == 
YarnApplicationState.FAILED ||
-                                       lastReport.getYarnApplicationState() == 
YarnApplicationState.KILLED);
+                       YarnApplicationState appState = 
lastReport.getYarnApplicationState();
+                       boolean status= (appState == 
YarnApplicationState.FAILED ||
+                                       appState == 
YarnApplicationState.KILLED);
+                       if(status) {
+                               LOG.warn("YARN reported application state {}", 
appState);
+                               LOG.warn("Diagnostics: {}", 
lastReport.getDiagnostics());
+                       }
+                       return status;
                }
        }
 
+
        @Override
        public String getDiagnostics() {
+               if(detached) {
+                       throw new IllegalArgumentException("The cluster has 
been started in detached mode." +
+                                       "Can not request cluster status");
+               }
                if (!hasFailed()) {
                        LOG.warn("getDiagnostics() called for cluster which is 
not in failed state");
                }
@@ -214,12 +266,16 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
 
        @Override
        public List<String> getNewMessages() {
+               if(detached) {
+                       throw new IllegalArgumentException("The cluster has 
been started in detached mode." +
+                                       "Can not request cluster status");
+               }
                if(hasBeenStopped()) {
-                       throw new RuntimeException("The FlinkYarnCluster has 
alread been stopped");
+                       throw new RuntimeException("The FlinkYarnCluster has 
already been stopped");
                }
                List<String> ret = new ArrayList<String>();
-               // get messages from ApplicationClient (locally)
 
+               // get messages from ApplicationClient (locally)
                while(true) {
                        Object result = null;
                        try {
@@ -228,7 +284,8 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
 
                                result = Await.result(response, akkaDuration);
                        } catch(Exception ioe) {
-                               LOG.warn("Error retrieving the yarn messages 
locally", ioe);
+                               LOG.warn("Error retrieving the YARN messages 
locally", ioe);
+                               break;
                        }
 
                        if(!(result instanceof Option)) {
@@ -236,7 +293,7 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                                                "Option. Instead the response 
is of type " + result.getClass() + ".");
                        } else {
                                Option messageOption = (Option) result;
-
+                               LOG.debug("Received message option {}", 
messageOption);
                                if(messageOption.isEmpty()) {
                                        break;
                                } else {
@@ -263,6 +320,10 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
        }
 
        private void shutdownInternal(boolean removeShutdownHook) {
+               if(detached) {
+                       throw new IllegalArgumentException("The cluster has 
been started in detached mode." +
+                                       "Can not control a detached cluster");
+               }
                if(hasBeenShutDown.getAndSet(true)) {
                        return;
                }
@@ -275,7 +336,8 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                        if(applicationClient != ActorRef.noSender()) {
                                try {
                                        Future<Object> response = 
Patterns.ask(applicationClient,
-                                                       new 
Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED),
+                                                       new 
Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED,
+                                                                       "Flink 
YARN Client requested shutdown"),
                                                        new 
Timeout(akkaDuration));
 
                                        Await.ready(response, akkaDuration);
@@ -372,7 +434,6 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                                        }
                                } catch (Exception e) {
                                        LOG.warn("Error while getting 
application report", e);
-                                       // TODO: do more here.
                                }
                                try {
                                        
Thread.sleep(FlinkYarnCluster.POLLING_THREAD_INTERVAL_MS);

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 214798c..829007e 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -48,6 +48,7 @@ public class YarnTaskManagerRunner {
 
                EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
TaskManager", args);
                EnvironmentInformation.checkJavaVersion();
+               org.apache.flink.runtime.util.SignalHandler.register(LOG);
 
                // try to parse the command line arguments
                final Configuration configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index f7f6967..731f8e2 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -102,7 +102,7 @@ class ApplicationClient extends Actor with ActorLogMessages 
with ActorLogging {
         WAIT_FOR_YARN_INTERVAL, jm, PollYarnClusterStatus))
 
     case msg: StopYarnSession =>
-      log.info("Stop yarn session.")
+      log.info("Sending StopYarnSession request to ApplicationMaster.")
       stopMessageReceiver = Some(sender)
       yarnJobManager foreach {
         _ forward msg
@@ -130,11 +130,16 @@ class ApplicationClient extends Actor with 
ActorLogMessages with ActorLogging {
     // -----------------  handle messages from the cluster -------------------
     // receive remote messages
     case msg: YarnMessage =>
+      log.debug("Received new YarnMessage {}. Now {} messages in queue", msg, 
messagesQueue.size)
       messagesQueue.enqueue(msg)
 
     // locally forward messages
     case LocalGetYarnMessage =>
-      sender() ! messagesQueue.headOption
+      if(messagesQueue.size > 0) {
+        sender() ! Option(messagesQueue.dequeue)
+      } else {
+        sender() ! None
+      }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/13bb21b1/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 966be7e..ce17d72 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.io.{File, PrintWriter, FileWriter, BufferedWriter}
+import java.io.{PrintWriter, FileWriter, BufferedWriter}
 import java.security.PrivilegedAction
 
 import akka.actor._
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.{GlobalConfiguration, 
Configuration, Confi
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
+import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
@@ -46,8 +47,12 @@ object ApplicationMaster {
 
   def main(args: Array[String]): Unit ={
     val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
-    LOG.info(s"YARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName}" +
-      s"' setting user to execute Flink ApplicationMaster/JobManager to 
$yarnClientUsername'")
+    LOG.info(s"YARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName} " +
+      s"setting user to execute Flink ApplicationMaster/JobManager to 
${yarnClientUsername}")
+
+    EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster/JobManager", args)
+    EnvironmentInformation.checkJavaVersion()
+    org.apache.flink.runtime.util.SignalHandler.register(LOG)
 
     val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
 
@@ -96,11 +101,15 @@ object ApplicationMaster {
           if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) 
!= -1) {
             LOG.info("Starting Job Manger web frontend.")
             config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logDirs)
+            config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // 
set port to 0.
             webserver = new WebInfoServer(config, jobManager, archiver)
             webserver.start()
           }
 
-          val jobManagerWebPort = if (webserver == null) -1 else 
webserver.getServerPort
+          val jobManagerWebPort = if (webserver == null) {
+            LOG.warn("Web server is null. It will not be accessible through 
YARN")
+            -1
+          } else webserver.getServerPort
 
           // generate configuration file for TaskManagers
           generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, 
ownHostname,
@@ -227,7 +236,7 @@ object ApplicationMaster {
 
     val jobManagerProps = Props(new JobManager(configuration, instanceManager, 
scheduler,
       libraryCacheManager, archiver, accumulatorManager, profiler, 
executionRetries,
-      delayBetweenRetries, timeout) with YarnJobManager)
+      delayBetweenRetries, timeout) with ApplicationMasterActor)
 
     LOG.debug("Starting JobManager actor")
     val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)

Reply via email to