http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index be2caaf..c291ada 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -54,7 +54,7 @@ public class ExecutionPlanCreationTest {
                        
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
mockJmAddress.getPort());
 
                        Optimizer optimizer = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
-                       OptimizedPlan op = (OptimizedPlan) 
Client.getOptimizedPlan(optimizer, prg, -1);
+                       OptimizedPlan op = (OptimizedPlan) 
ClusterClient.getOptimizedPlan(optimizer, prg, -1);
                        assertNotNull(op);
                        
                        PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 2541345..6ad250d 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -36,9 +36,10 @@ import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -205,9 +206,9 @@ public class FlinkClient {
                
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
jobManagerHost);
                
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
jobManagerPort);
 
-               final Client client;
+               final ClusterClient client;
                try {
-                       client = new Client(configuration);
+                       client = new StandaloneClusterClient(configuration);
                } catch (final IOException e) {
                        throw new RuntimeException("Could not establish a 
connection to the job manager", e);
                }
@@ -245,9 +246,9 @@ public class FlinkClient {
                
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
this.jobManagerHost);
                
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
this.jobManagerPort);
 
-               final Client client;
+               final ClusterClient client;
                try {
-                       client = new Client(configuration);
+                       client = new StandaloneClusterClient(configuration);
                } catch (final IOException e) {
                        throw new RuntimeException("Could not establish a 
connection to the job manager", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index 7962fce..bc5ae09 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -117,11 +117,14 @@ public class JobExecutionResult extends 
JobSubmissionResult {
                return (Integer) result;
        }
 
+
        /**
         * Returns a dummy object for wrapping a JobSubmissionResult
         * @param result The SubmissionResult
         * @return a JobExecutionResult
+        * @deprecated Will be removed in future versions.
         */
+       @Deprecated
        public static JobExecutionResult 
fromJobSubmissionResult(JobSubmissionResult result) {
                return new JobExecutionResult(result.getJobID(), -1, null);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
index 91a838b..4928b25 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.Public;
  */
 @Public
 public class JobSubmissionResult {
-       
+
        private JobID jobID;
 
        public JobSubmissionResult(JobID jobID) {
@@ -40,4 +40,26 @@ public class JobSubmissionResult {
        public JobID getJobID() {
                return jobID;
        }
+
+       /**
+        * Checks if this JobSubmissionResult is also a JobExecutionResult.
+        * See {@code getJobExecutionResult} to retrieve the JobExecutionResult.
+        * @return True if this is a JobExecutionResult, false otherwise
+        */
+       public boolean isJobExecutionResults() {
+               return this instanceof JobExecutionResult;
+       }
+
+       /**
+        * Returns the JobExecutionResult if available.
+        * @return The JobExecutionResult
+        * @throws ClassCastException if this is not a JobExecutionResult
+        */
+       public JobExecutionResult getJobExecutionResult() {
+               if (isJobExecutionResults()) {
+                       return (JobExecutionResult) this;
+               } else {
+                       throw new ClassCastException("This JobSubmissionResult 
is not a JobExecutionResult.");
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties 
b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
index acb9d1a..2aba6af 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
@@ -29,7 +29,7 @@ log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd 
HH:mm:ss,SSS} %-5p %-
 # Log output from org.apache.flink.yarn to the console. This is used by the
 # CliFrontend class when using a per-job YARN cluster.
 log4j.logger.org.apache.flink.yarn=INFO, console
-log4j.logger.org.apache.flink.client.FlinkYarnSessionCli=INFO, console
+log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console
 log4j.logger.org.apache.hadoop=INFO, console
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh 
b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index aa87c89..16f8ab9 100755
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS -classpath 
$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR  $log_setting 
org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../lib/ -j 
$FLINK_LIB_DIR/flink-dist*.jar "$@"
+$JAVA_RUN $JVM_ARGS -classpath 
$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR  $log_setting 
org.apache.flink.yarn.cli.FlinkYarnSessionCli -ship $bin/../lib/ -j 
$FLINK_LIB_DIR/flink-dist*.jar "$@"
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 955122f..d79768f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -62,7 +62,7 @@ public class DegreesWithExceptionITCase {
                }
                catch (Throwable t) {
                        t.printStackTrace();
-                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index 61ef446..56a0a59 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -66,7 +66,7 @@ public class ReduceOnEdgesWithExceptionITCase {
                }
                catch (Throwable t) {
                        t.printStackTrace();
-                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index 6cc0b6a..7458e08 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -67,7 +67,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
                }
                catch (Throwable t) {
                        t.printStackTrace();
-                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 03bae4e..9da54c1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -92,7 +92,7 @@ public abstract class JarActionHandler implements 
RequestHandler {
                ClassLoader classLoader = program.getUserCodeClassLoader();
 
                Optimizer optimizer = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new Configuration());
-               FlinkPlan plan = Client.getOptimizedPlan(optimizer, program, 
parallelism);
+               FlinkPlan plan = ClusterClient.getOptimizedPlan(optimizer, 
program, parallelism);
 
                if (plan instanceof StreamingPlan) {
                        graph = ((StreamingPlan) plan).getJobGraph();

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index cb95040..46a432e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -205,7 +205,16 @@ public class JobClient {
                checkNotNull(jobManagerGateway, "The jobManagerGateway must not 
be null.");
                checkNotNull(jobGraph, "The jobGraph must not be null.");
                checkNotNull(timeout, "The timeout must not be null.");
-               
+
+               LOG.info("Checking and uploading JAR files");
+               try {
+                       JobClient.uploadJarFiles(jobGraph, jobManagerGateway, 
timeout);
+               }
+               catch (IOException e) {
+                       throw new JobSubmissionException(jobGraph.getJobID(),
+                               "Could not upload the program's JAR files to 
the JobManager.", e);
+               }
+
                Object result;
                try {
                        Future<Object> future = jobManagerGateway.ask(
@@ -214,7 +223,7 @@ public class JobClient {
                                        ListeningBehaviour.DETACHED // only 
receive the Acknowledge for the job submission message
                                ),
                                timeout);
-                       
+
                        result = Await.result(future, timeout);
                }
                catch (TimeoutException e) {
@@ -225,10 +234,10 @@ public class JobClient {
                        throw new JobExecutionException(jobGraph.getJobID(),
                                        "Failed to send job to JobManager: " + 
t.getMessage(), t.getCause());
                }
-               
+
                if (result instanceof JobManagerMessages.JobSubmitSuccess) {
                        JobID respondedID = 
((JobManagerMessages.JobSubmitSuccess) result).jobId();
-                       
+
                        // validate response
                        if (!respondedID.equals(jobGraph.getJobID())) {
                                throw new 
JobExecutionException(jobGraph.getJobID(),

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
index 9746cef..b7bb84e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java
@@ -51,4 +51,5 @@ public enum ApplicationStatus {
        public int processExitCode() {
                return processExitCode;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 1bcb195..0aaf098 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -538,7 +538,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceIDRetrieva
        }
 
        // 
------------------------------------------------------------------------
-       //  Cluster Shutdown
+       //  ClusterClient Shutdown
        // 
------------------------------------------------------------------------
 
        private void shutdownCluster(ApplicationStatus status, String 
diagnostics) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java
index 5146d7b..9ad5ba5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.clusterframework.messages;
 import java.io.Serializable;
 
 /**
- * The reply to a {@code GetClusterStatus} message sent by the resource 
manager. Sends over the
+ * The reply to a {@code GetClusterStatus} message sent by the job manager. 
Sends over the
  * current number of task managers and the available task slots.
  */
 public class GetClusterStatusResponse implements Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
deleted file mode 100644
index c1498c5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.fs.Path;
-import java.io.File;
-import java.util.List;
-
-/**
- * Abstract interface for an implementation of a Flink on YARN client to 
deploy.
- *
- * The Client describes the properties of the YARN application to create.
- */
-public abstract class AbstractFlinkYarnClient {
-
-       // ---- Setter for YARN Cluster properties ----- //
-
-       /**
-        * @param memoryMB The amount of memory for the JobManager (in MB)
-        */
-       public abstract void setJobManagerMemory(int memoryMB);
-
-       /**
-        * @param memoryMB The memory per TaskManager (in MB)
-        */
-       public abstract void setTaskManagerMemory(int memoryMB);
-
-       /**
-        * Flink configuration
-        */
-       public abstract void 
setFlinkConfiguration(org.apache.flink.configuration.Configuration conf);
-
-       public abstract Configuration getFlinkConfiguration();
-
-       /**
-        *
-        * @param slots The number of TaskManager slots per TaskManager.
-        */
-       public abstract void setTaskManagerSlots(int slots);
-
-       /**
-        * @return the number of TaskManager processing slots per TaskManager.
-        */
-       public abstract int getTaskManagerSlots();
-
-       /**
-        * @param queue Name of the YARN queue
-        */
-       public abstract void setQueue(String queue);
-
-       /**
-        *
-        * @param localJarPath Local Path to the Flink uberjar
-        */
-       public abstract void setLocalJarPath(Path localJarPath);
-
-       /**
-        *
-        * @param confPath local path to the Flink configuration file
-        */
-       public abstract void setConfigurationFilePath(Path confPath);
-
-       /**
-        *
-        * @param logConfPath local path to the flink logging configuration
-        */
-       public abstract void setFlinkLoggingConfigurationPath(Path logConfPath);
-       public abstract Path getFlinkLoggingConfigurationPath();
-
-       /**
-        *
-        * @param tmCount number of TaskManagers to start
-        */
-       public abstract void setTaskManagerCount(int tmCount);
-       public abstract int getTaskManagerCount();
-
-       /**
-        * @param confDirPath Path to config directory.
-        */
-       public abstract void setConfigurationDirectory(String confDirPath);
-
-       /**
-        * List of files to transfer to the YARN containers.
-        */
-       public abstract void setShipFiles(List<File> shipFiles);
-
-       /**
-        *
-        * @param dynamicPropertiesEncoded Encoded String of the dynamic 
properties (-D configuration values of the Flink configuration)
-        */
-       public abstract void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded);
-       public abstract String getDynamicPropertiesEncoded();
-
-       // --------------------------------------- Operations on the YARN 
cluster ----- //
-
-       /**
-        * Returns a String containing details about the cluster (NodeManagers, 
available memory, ...)
-        *
-        */
-       public abstract String getClusterDescription() throws Exception;
-
-       /**
-        * Trigger the deployment to YARN.
-        *
-        */
-       public abstract AbstractFlinkYarnCluster deploy() throws Exception;
-
-       /**
-        * @param detachedMode If true, the Flink YARN client is non-blocking. 
That means it returns
-        *                        once Flink has been started successfully on 
YARN.
-        */
-       public abstract void setDetachedMode(boolean detachedMode);
-
-       public abstract boolean isDetached();
-
-       /**
-        * @return The string representation of the Path to the YARN session 
files. This is a temporary
-        * directory in HDFS that contains the jar files and configuration 
which is shipped to all the containers.
-        */
-       public abstract String getSessionFilesDir();
-
-       /**
-        * Set a name for the YARN application
-        * @param name
-        */
-       public abstract void setName(String name);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
deleted file mode 100644
index af015c7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.yarn;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-/**
- * Abstract class for interacting with a running Flink cluster within YARN.
- */
-public abstract class AbstractFlinkYarnCluster {
-
-       /**
-        * Get hostname and port of the JobManager.
-        */
-       public abstract InetSocketAddress getJobManagerAddress();
-
-       /**
-        * Returns an URL (as a string) to the JobManager web interface, 
running next to the
-        * ApplicationMaster and JobManager in a YARN container
-        */
-       public abstract String getWebInterfaceURL();
-
-       /**
-        * Request the YARN cluster to shut down.
-        *
-        * @param failApplication If true, the application will be marked as 
failed in YARN
-        */
-       public abstract void shutdown(boolean failApplication);
-
-       /**
-        * Boolean indicating whether the cluster has been stopped already
-        */
-       public abstract boolean hasBeenStopped();
-
-       /**
-        * Returns the latest cluster status, with number of Taskmanagers and 
slots
-        */
-       public abstract GetClusterStatusResponse getClusterStatus();
-
-       /**
-        * Boolean indicating whether the Flink YARN cluster is in an erronous 
state.
-        */
-       public abstract boolean hasFailed();
-
-       /**
-        * @return Diagnostics if the Cluster is in "failed" state.
-        */
-       public abstract String getDiagnostics();
-
-       /**
-        * May return new messages from the cluster.
-        * Messages can be for example about failed containers or container 
launch requests.
-        */
-       public abstract List<String> getNewMessages();
-
-       /**
-        * Returns a string representation of the ApplicationID assigned by 
YARN.
-        */
-       public abstract String getApplicationId();
-
-       /**
-        * Flink's YARN cluster abstraction has two modes for connecting to the 
YARN AM.
-        * In the detached mode, the AM is launched and the Flink YARN client 
is disconnecting
-        * afterwards.
-        * In the non-detached mode, it maintains a connection with the AM to 
control the cluster.
-        * @return boolean indicating whether the cluster is a detached cluster
-        */
-       public abstract boolean isDetached();
-
-       /**
-        * Connect the FlinkYarnCluster to the ApplicationMaster.
-        *
-        * Detached YARN sessions don't need to connect to the 
ApplicationMaster.
-        * Detached per job YARN sessions need to connect until the required 
number of TaskManagers have been started.
-        *
-        * @throws IOException
-        */
-       public abstract void connectToCluster() throws IOException;
-
-       /**
-        * Disconnect from the ApplicationMaster without stopping the session
-        * (therefore, use the {@link 
AbstractFlinkYarnCluster#shutdown(boolean)} method.
-        *
-        * @see AbstractFlinkYarnCluster#shutdown(boolean)
-        */
-       public abstract void disconnect();
-
-       /**
-        * Tells the ApplicationMaster to monitor the status of JobId and stop 
itself once the specified
-        * job has finished.
-        *
-        * @param jobID Id of the job
-        */
-       public abstract void stopAfterJob(JobID jobID);
-
-       /**
-        * Return the Flink configuration object
-        * @return The Flink configuration object
-        */
-       public abstract Configuration getFlinkConfiguration();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 7b80206..1a8870b 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Properties
 
-import org.apache.flink.client.{CliFrontend, ClientUtils, FlinkYarnSessionCli}
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.client.cli.CliFrontendParser
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.client.CliFrontend
+import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration}
 import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, 
LocalFlinkMiniCluster}
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster
-import org.apache.hadoop.fs.Path
 
+import scala.collection.mutable.ArrayBuffer
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
@@ -138,7 +138,7 @@ object FlinkShell {
 
   def fetchConnectionInfo(
     config: Config
-  ): (String, Int, Option[Either[FlinkMiniCluster, AbstractFlinkYarnCluster]]) 
= {
+  ): (String, Int, Option[Either[FlinkMiniCluster, ClusterClient]]) = {
     config.executionMode match {
       case ExecutionMode.LOCAL => // Local mode
         val config = GlobalConfiguration.getConfiguration()
@@ -217,7 +217,7 @@ object FlinkShell {
       repl.closeInterpreter()
       cluster match {
         case Some(Left(miniCluster)) => miniCluster.stop()
-        case Some(Right(yarnCluster)) => yarnCluster.shutdown(false)
+        case Some(Right(yarnCluster)) => yarnCluster.shutdown()
         case _ =>
       }
     }
@@ -226,71 +226,49 @@ object FlinkShell {
   }
 
   def deployNewYarnCluster(yarnConfig: YarnConfig) = {
-    val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient
-
-    // use flink-dist.jar for scala shell
-    val jarPath = new Path("file://" +
-      
s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}")
-    yarnClient.setLocalJarPath(jarPath)
-    
-    val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv
-    val flinkConfiguration = GlobalConfiguration.getConfiguration
-    val confFile = new File(confDirPath + File.separator + "flink-conf.yaml")
-    val confPath = new Path(confFile.getAbsolutePath)
-    yarnClient.setFlinkConfiguration(flinkConfiguration)
-    yarnClient.setConfigurationDirectory(confDirPath)
-    yarnClient.setConfigurationFilePath(confPath)
+
+    val args = ArrayBuffer[String](
+      "-m", "yarn-cluster"
+    )
 
     // number of task managers is required.
     yarnConfig.containers match {
-      case Some(containers) => yarnClient.setTaskManagerCount(containers)
+      case Some(containers) => args ++= Seq("-yn", containers.toString)
       case None =>
         throw new IllegalArgumentException("Number of taskmanagers must be 
specified.")
     }
 
     // set configuration from user input
-    yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory)
-    yarnConfig.name.foreach(yarnClient.setName)
-    yarnConfig.queue.foreach(yarnClient.setQueue)
-    yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots)
-    yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory)
-
-    // deploy
-    val cluster = yarnClient.deploy()
+    yarnConfig.jobManagerMemory.foreach((jmMem) => args ++= Seq("-yjm", 
jmMem.toString))
+    yarnConfig.slots.foreach((tmMem) => args ++= Seq("-ytm", tmMem.toString))
+    yarnConfig.name.foreach((name) => args ++= Seq("-ynm", name.toString))
+    yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString))
+    yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString))
+
+    val customCLI = 
CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
+
+    val options = CliFrontendParser.parseRunCommand(args.toArray)
+
+    val cluster = customCLI.createClient("Flink Scala Shell", 
options.getCommandLine)
+
     val address = cluster.getJobManagerAddress.getAddress.getHostAddress
     val port = cluster.getJobManagerAddress.getPort
-    cluster.connectToCluster()
 
     (address, port, Some(Right(cluster)))
   }
 
   def fetchDeployedYarnClusterInfo() = {
+
     // load configuration
     val globalConfig = GlobalConfiguration.getConfiguration
-    val propertiesLocation = 
CliFrontend.getYarnPropertiesLocation(globalConfig)
-    val propertiesFile = new File(propertiesLocation)
-
-    // read properties
-    val properties = if (propertiesFile.exists()) {
-      println("Found YARN properties file " + propertiesFile.getAbsolutePath)
-      val properties = new Properties()
-      val inputStream = new FileInputStream(propertiesFile)
-
-      try {
-        properties.load(inputStream)
-      } finally {
-        inputStream.close()
-      }
 
-      properties
-    } else {
-      throw new IllegalArgumentException("Scala Shell cannot fetch YARN 
properties.")
-    }
+    val customCLI = 
CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster")
+
+    val cluster = customCLI.retrieveCluster(globalConfig)
 
-    val addressInStr = 
properties.getProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY)
-    val address = ClientUtils.parseHostPortAddress(addressInStr)
+    val jobManager = cluster.getJobManagerAddress
 
-    (address.getHostString, address.getPort, None)
+    (jobManager.getHostString, jobManager.getPort, None)
   }
 
   def ensureYarnConfig(config: Config) = config.yarnConfig match {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4475bc8..f03cb84 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -775,7 +775,7 @@ object ExecutionEnvironment {
    * configuration parameters for the Client only; Program parallelism can be 
set via
    * [[ExecutionEnvironment.setParallelism]].
    *
-   * Cluster configuration has to be done in the remotely running Flink 
instance.
+   * ClusterClient configuration has to be done in the remotely running Flink 
instance.
    *
    * @param host The host name or address of the master (JobManager), where 
the program should be
    *             executed.

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
 
b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
index f37969d..bc9bedc 100644
--- 
a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java
@@ -230,4 +230,4 @@ public class ElasticsearchSinkITCase extends 
StreamingMultipleProgramsTestBase {
                        indexer.add(createIndexRequest(element));
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 03945a0..333f9c0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -30,9 +30,10 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
@@ -195,9 +196,9 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
                
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
                
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 
-               Client client;
+               ClusterClient client;
                try {
-                       client = new Client(configuration);
+                       client = new StandaloneClusterClient(configuration);
                        
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
                }
                catch (Exception e) {
@@ -205,7 +206,7 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
                }
 
                try {
-                       return client.runBlocking(streamGraph, jarFiles, 
globalClasspaths, usercodeClassLoader);
+                       return client.run(streamGraph, jarFiles, 
globalClasspaths, usercodeClassLoader).getJobExecutionResult();
                }
                catch (ProgramInvocationException e) {
                        throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 5999143..0332684 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -63,7 +63,10 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                        ((DetachedEnvironment) 
ctx).setDetachedPlan(streamGraph);
                        return 
DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
                } else {
-                       return ctx.getClient().runBlocking(streamGraph, 
ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), 
ctx.getSavepointPath());
+                       return ctx
+                               .getClient()
+                               .run(streamGraph, ctx.getJars(), 
ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath())
+                               .getJobExecutionResult();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index b1768f0..34a7eed 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -78,7 +78,7 @@ public class RemoteEnvironmentITCase {
                }
                catch (Throwable t) {
                        t.printStackTrace();
-                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 7dccb7d..09b5e7e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -69,7 +69,7 @@ public class AutoParallelismITCase {
                catch (Throwable t) {
                        System.err.println("Error stopping cluster on 
shutdown");
                        t.printStackTrace();
-                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
index 4437db1..0a0f451 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -69,7 +69,7 @@ public class SimpleRecoveryITCase {
                catch (Throwable t) {
                        System.err.println("Error stopping cluster on 
shutdown");
                        t.printStackTrace();
-                       fail("Cluster shutdown caused an exception: " + 
t.getMessage());
+                       fail("ClusterClient shutdown caused an exception: " + 
t.getMessage());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 239a85a..e849211 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -92,11 +92,16 @@ under the License.
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-surefire-plugin</artifactId>
-                               <configuration>
-                                       <!-- Enforce single threaded execution 
due to port conflicts with the mini yarn cluster -->
-                                       <forkCount>1</forkCount>
-                                       <workingDirectory>../</workingDirectory>
-                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>integration-tests</id>
+                                               <configuration>
+                                                       <!-- Enforce single 
threaded execution due to port conflicts with the mini yarn cluster -->
+                                                       <forkCount>1</forkCount>
+                                                       
<workingDirectory>../</workingDirectory>
+                                               </configuration>
+                                       </execution>
+                               </executions>
                        </plugin>
 
                        <plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
new file mode 100644
index 0000000..c6a1ade
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that verify that the CLI client picks up the correct address for the 
JobManager
+ * from configuration and configs.
+ */
+public class CliFrontendYarnAddressConfigurationTest {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private final static PrintStream OUT = System.out;
+       private final static PrintStream ERR = System.err;
+
+       @BeforeClass
+       public static void disableStdOutErr() {
+               class NullPrint extends OutputStream {
+                       @Override
+                       public void write(int b) {}
+               }
+
+               PrintStream nullPrinter = new PrintStream(new NullPrint());
+               System.setOut(nullPrinter);
+               System.setErr(nullPrinter);
+       }
+
+       @AfterClass
+       public static void restoreAfterwards() {
+               System.setOut(OUT);
+               System.setErr(ERR);
+       }
+
+       @Before
+       public void clearConfig() throws NoSuchFieldException, 
IllegalAccessException {
+               // reset GlobalConfiguration between tests
+               Field instance = 
GlobalConfiguration.class.getDeclaredField("SINGLETON");
+               instance.setAccessible(true);
+               instance.set(null, null);
+       }
+
+       private static final String TEST_YARN_JOB_MANAGER_ADDRESS = 
"22.33.44.55";
+       private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
+
+       private static final String propertiesFile =
+               "jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + 
TEST_YARN_JOB_MANAGER_PORT;
+
+
+       private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
+       private static final int TEST_JOB_MANAGER_PORT = 55443;
+
+       private static final String flinkConf =
+               "jobmanager.rpc.address: " + TEST_JOB_MANAGER_ADDRESS + "\n" +
+               "jobmanager.rpc.port: " + TEST_JOB_MANAGER_PORT;
+
+
+       private static final String invalidPropertiesFile =
+               "jasfobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + 
TEST_YARN_JOB_MANAGER_PORT;
+
+
+       /**
+        * Test that the CliFrontend is able to pick up the .yarn-properties 
file from a specified location.
+        */
+       @Test
+       public void testYarnConfig() {
+               try {
+                       File tmpFolder = temporaryFolder.newFolder();
+                       String currentUser = System.getProperty("user.name");
+
+                       // copy .yarn-properties-<username>
+                       File testPropertiesFile = new File(tmpFolder, 
".yarn-properties-"+currentUser);
+                       Files.write(testPropertiesFile.toPath(), 
propertiesFile.getBytes(), StandardOpenOption.CREATE);
+
+                       // copy reference flink-conf.yaml to temporary test 
directory and append custom configuration path.
+                       String confFile = flinkConf + 
"\nyarn.properties-file.location: " + tmpFolder;
+                       File testConfFile = new 
File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
+                       Files.write(testConfFile.toPath(), confFile.getBytes(), 
StandardOpenOption.CREATE);
+
+                       // start CLI Frontend
+                       TestCLI frontend = new 
TestCLI(tmpFolder.getAbsolutePath());
+
+                       CommandLineOptions options = 
mock(CommandLineOptions.class);
+
+                       frontend.getClient(options, "Program name");
+
+                       frontend.updateConfig(options);
+                       Configuration config = frontend.getConfiguration();
+
+                       checkJobManagerAddress(
+                                       config,
+                                       TEST_YARN_JOB_MANAGER_ADDRESS,
+                                       TEST_YARN_JOB_MANAGER_PORT);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       public static class TestCLI extends CliFrontend {
+               TestCLI(String configDir) throws Exception {
+                       super(configDir);
+               }
+
+               @Override
+               public ClusterClient getClient(CommandLineOptions options, 
String programName) throws Exception {
+                       return super.getClient(options, programName);
+               }
+
+               @Override
+               public void updateConfig(CommandLineOptions options) {
+                       super.updateConfig(options);
+               }
+       }
+
+       @Test
+       public void testInvalidYarnConfig() {
+               try {
+                       File tmpFolder = temporaryFolder.newFolder();
+
+                       // copy invalid .yarn-properties-<username>
+                       File testPropertiesFile = new File(tmpFolder, 
".yarn-properties");
+                       Files.write(testPropertiesFile.toPath(), 
invalidPropertiesFile.getBytes(), StandardOpenOption.CREATE);
+
+                       // copy reference flink-conf.yaml to temporary test 
directory and append custom configuration path.
+                       String confFile = flinkConf + 
"\nyarn.properties-file.location: " + tmpFolder;
+                       File testConfFile = new 
File(tmpFolder.getAbsolutePath(), "flink-conf.yaml");
+                       Files.write(testConfFile.toPath(), confFile.getBytes(), 
StandardOpenOption.CREATE);
+
+                       TestCLI cli = new TestCLI(tmpFolder.getAbsolutePath());
+
+                       CommandLineOptions options = 
mock(CommandLineOptions.class);
+
+                       cli.updateConfig(options);
+
+                       Configuration config = cli.getConfiguration();
+
+                       checkJobManagerAddress(
+                               config,
+                               TEST_JOB_MANAGER_ADDRESS,
+                               TEST_JOB_MANAGER_PORT);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+
+       @Test
+       public void testManualOptionsOverridesYarn() {
+               try {
+                       File emptyFolder = temporaryFolder.newFolder();
+                       TestCLI frontend = new 
TestCLI(emptyFolder.getAbsolutePath());
+
+                       CommandLineOptions options = 
mock(CommandLineOptions.class);
+                       
when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
+
+                       frontend.updateConfig(options);
+
+                       Configuration config = frontend.getConfiguration();
+
+                       InetSocketAddress expectedAddress = new 
InetSocketAddress("10.221.130.22", 7788);
+
+                       checkJobManagerAddress(config, 
expectedAddress.getHostName(), expectedAddress.getPort());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+
+       private static void checkJobManagerAddress(Configuration config, String 
expectedAddress, int expectedPort) {
+               String jobManagerAddress = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+               int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+               assertEquals(expectedAddress, jobManagerAddress);
+               assertEquals(expectedPort, jobManagerPort);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 7197b64..c842bdc 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -24,8 +24,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 
 import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.junit.Assert;
@@ -38,6 +37,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties;
+
 public class FlinkYarnSessionCliTest {
 
        @Rule
@@ -54,7 +55,7 @@ public class FlinkYarnSessionCliTest {
                TestBaseUtils.setEnv(map);
                Options options = new Options();
                FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", 
false);
-               cli.getYARNSessionCLIOptions(options);
+               cli.addOptions(options);
 
                CommandLineParser parser = new PosixParser();
                CommandLine cmd = null;
@@ -65,11 +66,12 @@ public class FlinkYarnSessionCliTest {
                        Assert.fail("Parsing failed with " + e.getMessage());
                }
 
-               AbstractFlinkYarnClient flinkYarnClient = 
cli.createFlinkYarnClient(cmd);
+               YarnClusterDescriptor flinkYarnDescriptor = 
cli.createDescriptor(null, cmd);
 
-               Assert.assertNotNull(flinkYarnClient);
+               Assert.assertNotNull(flinkYarnDescriptor);
 
-               Map<String, String> dynProperties = 
CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
+               Map<String, String> dynProperties =
+                       
FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded());
                Assert.assertEquals(1, dynProperties.size());
                Assert.assertEquals("5 min", 
dynProperties.get("akka.ask.timeout"));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
deleted file mode 100644
index dbfbfe2..0000000
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Yarn client which starts a {@link TestingApplicationMaster}. Additionally 
the client adds the
- * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the 
set of files which
- * are shipped to the yarn cluster. This is necessary to load the testing 
classes.
- */
-public class TestingFlinkYarnClient extends FlinkYarnClientBase {
-
-       public TestingFlinkYarnClient() {
-               List<File> filesToShip = new ArrayList<>();
-
-               File testingJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn-tests"));
-               Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
-                       "Make sure to package the flink-yarn-tests module.");
-
-               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-runtime"));
-               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
-                       "jar. Make sure to package the flink-runtime module.");
-
-               filesToShip.add(testingJar);
-               filesToShip.add(testingRuntimeJar);
-
-               setShipFiles(filesToShip);
-       }
-
-       @Override
-       protected Class<?> getApplicationMasterClass() {
-               return TestingApplicationMaster.class;
-       }
-
-       public static class TestJarFinder implements FilenameFilter {
-
-               private final String jarName;
-
-               public TestJarFinder(final String jarName) {
-                       this.jarName = jarName;
-               }
-
-               @Override
-               public boolean accept(File dir, String name) {
-                       return name.startsWith(jarName) && 
name.endsWith("-tests.jar") &&
-                               dir.getAbsolutePath().contains(dir.separator + 
jarName + dir.separator);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
new file mode 100644
index 0000000..386f48f
--- /dev/null
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Yarn client which starts a {@link TestingApplicationMaster}. Additionally 
the client adds the
+ * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the 
set of files which
+ * are shipped to the yarn cluster. This is necessary to load the testing 
classes.
+ */
+public class TestingYarnClusterDescriptor extends 
AbstractYarnClusterDescriptor {
+
+       public TestingYarnClusterDescriptor() {
+               List<File> filesToShip = new ArrayList<>();
+
+               File testingJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-yarn-tests"));
+               Preconditions.checkNotNull(testingJar, "Could not find the 
flink-yarn-tests tests jar. " +
+                       "Make sure to package the flink-yarn-tests module.");
+
+               File testingRuntimeJar = YarnTestBase.findFile("..", new 
TestJarFinder("flink-runtime"));
+               Preconditions.checkNotNull(testingRuntimeJar, "Could not find 
the flink-runtime tests " +
+                       "jar. Make sure to package the flink-runtime module.");
+
+               filesToShip.add(testingJar);
+               filesToShip.add(testingRuntimeJar);
+
+               setShipFiles(filesToShip);
+       }
+
+       @Override
+       protected Class<?> getApplicationMasterClass() {
+               return TestingApplicationMaster.class;
+       }
+
+       public static class TestJarFinder implements FilenameFilter {
+
+               private final String jarName;
+
+               public TestJarFinder(final String jarName) {
+                       this.jarName = jarName;
+               }
+
+               @Override
+               public boolean accept(File dir, String name) {
+                       return name.startsWith(jarName) && 
name.endsWith("-tests.jar") &&
+                               dir.getAbsolutePath().contains(dir.separator + 
jarName + dir.separator);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a93abf0..4d45f16 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
 import org.apache.curator.test.TestingServer;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.AfterClass;
@@ -97,7 +97,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
        public void testMultipleAMKill() throws Exception {
                final int numberKillingAttempts = numberApplicationAttempts - 1;
 
-               TestingFlinkYarnClient flinkYarnClient = new 
TestingFlinkYarnClient();
+               TestingYarnClusterDescriptor flinkYarnClient = new 
TestingYarnClusterDescriptor();
 
                Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
                flinkYarnClient.setTaskManagerCount(1);
@@ -119,13 +119,12 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                        "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + 
fsStateHandlePath + "/recovery");
                flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
 
-               AbstractFlinkYarnCluster yarnCluster = null;
+               ClusterClient yarnCluster = null;
 
                final FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.MINUTES);
 
                try {
                        yarnCluster = flinkYarnClient.deploy();
-                       yarnCluster.connectToCluster();
                        final Configuration config = 
yarnCluster.getFlinkConfiguration();
 
                        new JavaTestKit(actorSystem) {{
@@ -169,7 +168,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
                        }};
                } finally {
                        if (yarnCluster != null) {
-                               yarnCluster.shutdown(false);
+                               yarnCluster.shutdown();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 38e17a5..826a086 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -328,7 +328,7 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
        @Test
        public void testNonexistingQueue() {
                LOG.info("Starting testNonexistingQueue()");
-               addTestAppender(FlinkYarnClient.class, Level.WARN);
+               addTestAppender(YarnClusterDescriptor.class, Level.WARN);
                runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
                                "-t", flinkLibFolder.getAbsolutePath(),
                                "-n", "1",
@@ -432,7 +432,9 @@ public class YARNSessionCapacitySchedulerITCase extends 
YarnTestBase {
                                "-yD", "yarn.heap-cutoff-ratio=0.5", // test if 
the cutoff is passed correctly
                                "-ytm", "1024",
                                "-ys", "2", // test requesting slots from YARN.
-                               "--yarndetached", job, "--input", 
tmpInFile.getAbsoluteFile().toString(), "--output", 
tmpOutFolder.getAbsoluteFile().toString()},
+                               "--yarndetached", job,
+                               "--input", 
tmpInFile.getAbsoluteFile().toString(),
+                               "--output", 
tmpOutFolder.getAbsoluteFile().toString()},
                        "Job has been submitted with JobID",
                        RunTypes.CLI_FRONTEND);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index cb402a3..fe5400a 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.configuration.GlobalConfiguration;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -95,8 +94,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
                checkForLogString("The Flink YARN client has been started in 
detached mode");
 
-               Assert.assertFalse("The runner should detach.", 
runner.isAlive());
-
                LOG.info("Waiting until two containers are running");
                // wait until two containers are running
                while(getRunningContainers() < 2) {
@@ -171,7 +168,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
        @Ignore("The test is too resource consuming (8.5 GB of memory)")
        @Test
        public void testResourceComputation() {
-               addTestAppender(FlinkYarnClient.class, Level.WARN);
+               addTestAppender(YarnClusterDescriptor.class, Level.WARN);
                LOG.info("Starting testResourceComputation()");
                runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
                                "-n", "5",
@@ -199,7 +196,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
        @Ignore("The test is too resource consuming (8 GB of memory)")
        @Test
        public void testfullAlloc() {
-               addTestAppender(FlinkYarnClient.class, Level.WARN);
+               addTestAppender(YarnClusterDescriptor.class, Level.WARN);
                LOG.info("Starting testfullAlloc()");
                runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), 
"-t", flinkLibFolder.getAbsolutePath(),
                                "-n", "2",
@@ -218,7 +215,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                final int WAIT_TIME = 15;
                LOG.info("Starting testJavaAPI()");
 
-               AbstractFlinkYarnClient flinkYarnClient = 
FlinkYarnSessionCli.getFlinkYarnClient();
+               AbstractYarnClusterDescriptor flinkYarnClient = new 
YarnClusterDescriptor();
                Assert.assertNotNull("unable to get yarn client", 
flinkYarnClient);
                flinkYarnClient.setTaskManagerCount(1);
                flinkYarnClient.setJobManagerMemory(768);
@@ -231,10 +228,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
 
                // deploy
-               AbstractFlinkYarnCluster yarnCluster = null;
+               ClusterClient yarnCluster = null;
                try {
                        yarnCluster = flinkYarnClient.deploy();
-                       yarnCluster.connectToCluster();
                } catch (Exception e) {
                        LOG.warn("Failing test", e);
                        Assert.fail("Error while deploying YARN cluster: 
"+e.getMessage());
@@ -248,7 +244,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                        }
                        GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
                        if(status != null && status.equals(expectedStatus)) {
-                               LOG.info("Cluster reached status " + status);
+                               LOG.info("ClusterClient reached status " + 
status);
                                break; // all good, cluster started
                        }
                        if(second > WAIT_TIME) {
@@ -263,7 +259,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
                LOG.info("Shutting down cluster. All tests passed");
                // shutdown cluster
-               yarnCluster.shutdown(false);
+               yarnCluster.shutdown();
                LOG.info("Finished testJavaAPI()");
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 03ab647..4de964a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -368,7 +368,7 @@ public abstract class YarnTestBase extends TestLogger {
 
                        File yarnConfFile = writeYarnSiteConfigXML(conf);
                        map.put("YARN_CONF_DIR", 
yarnConfFile.getParentFile().getAbsolutePath());
-                       map.put("IN_TESTS", "yes we are in tests"); // see 
FlinkYarnClient() for more infos
+                       map.put("IN_TESTS", "yes we are in tests"); // see 
YarnClusterDescriptor() for more infos
                        TestBaseUtils.setEnv(map);
 
                        Assert.assertTrue(yarnCluster.getServiceState() == 
Service.STATE.STARTED);

Reply via email to