http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
deleted file mode 100644
index 3b48228..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-
-import static akka.pattern.Patterns.ask;
-
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.net.ConnectionUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.None$;
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Java representation of a running Flink cluster within YARN.
- */
-public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnCluster.class);
-
-       private static final int POLLING_THREAD_INTERVAL_MS = 1000;
-
-       private YarnClient yarnClient;
-       private Thread actorRunner;
-       private Thread clientShutdownHook = new ClientShutdownHook();
-       private PollingThread pollingRunner;
-       private final Configuration hadoopConfig;
-       // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
-       private final Path sessionFilesDir;
-       private final InetSocketAddress jobManagerAddress;
-
-       //---------- Class internal fields -------------------
-
-       private ActorSystem actorSystem;
-       private ActorRef applicationClient;
-       private ApplicationReport intialAppReport;
-       private final FiniteDuration akkaDuration;
-       private final Timeout akkaTimeout;
-       private final ApplicationId applicationId;
-       private final boolean detached;
-       private final org.apache.flink.configuration.Configuration flinkConfig;
-       private final ApplicationId appId;
-
-       private boolean isConnected = false;
-
-
-       /**
-        * Create a new Flink on YARN cluster.
-        *
-        * @param yarnClient Client to talk to YARN
-        * @param appId the YARN application ID
-        * @param hadoopConfig Hadoop configuration
-        * @param flinkConfig Flink configuration
-        * @param sessionFilesDir Location of files required for YARN session
-        * @param detached Set to true if no actor system or RPC communication 
with the cluster should be established
-        * @throws IOException
-        * @throws YarnException
-        */
-       public FlinkYarnCluster(
-                       final YarnClient yarnClient,
-                       final ApplicationId appId,
-                       Configuration hadoopConfig,
-                       org.apache.flink.configuration.Configuration 
flinkConfig,
-                       Path sessionFilesDir,
-                       boolean detached) throws IOException, YarnException {
-               this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
-               this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
-               this.yarnClient = yarnClient;
-               this.hadoopConfig = hadoopConfig;
-               this.sessionFilesDir = sessionFilesDir;
-               this.applicationId = appId;
-               this.detached = detached;
-               this.flinkConfig = flinkConfig;
-               this.appId = appId;
-
-               // get one application report manually
-               intialAppReport = yarnClient.getApplicationReport(appId);
-               String jobManagerHost = intialAppReport.getHost();
-               int jobManagerPort = intialAppReport.getRpcPort();
-               this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
-       }
-
-       /**
-        * Connect the FlinkYarnCluster to the ApplicationMaster.
-        *
-        * Detached YARN sessions don't need to connect to the 
ApplicationMaster.
-        * Detached per job YARN sessions need to connect until the required 
number of TaskManagers have been started.
-        * 
-        * @throws IOException
-        */
-       public void connectToCluster() throws IOException {
-               if(isConnected) {
-                       throw new IllegalStateException("Can not connect to the 
cluster again");
-               }
-
-               // start actor system
-               LOG.info("Start actor system.");
-               // find name of own public interface, able to connect to the JM
-               // try to find address for 2 seconds. log after 400 ms.
-               InetAddress ownHostname = 
ConnectionUtils.findConnectingAddress(jobManagerAddress, 2000, 400);
-               actorSystem = AkkaUtils.createActorSystem(flinkConfig,
-                               new Some<Tuple2<String, Object>>(new 
Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
-
-               // Create the leader election service
-               
flinkConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
this.jobManagerAddress.getHostName());
-               
flinkConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
this.jobManagerAddress.getPort());
-
-               LeaderRetrievalService leaderRetrievalService;
-
-               try {
-                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
-               } catch (Exception e) {
-                       throw new IOException("Could not create the leader 
retrieval service.", e);
-               }
-
-               // start application client
-               LOG.info("Start application client.");
-
-               applicationClient = actorSystem.actorOf(
-                       Props.create(
-                               ApplicationClient.class,
-                               flinkConfig,
-                               leaderRetrievalService),
-                       "applicationClient");
-
-               actorRunner = new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               // blocks until ApplicationClient has been 
stopped
-                               actorSystem.awaitTermination();
-
-                               // get final application report
-                               try {
-                                       ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
-
-                                       LOG.info("Application " + appId + " 
finished with state " + appReport
-                                                       
.getYarnApplicationState() + " and final state " + appReport
-                                                       
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
-                                       if (appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
-                                                       == 
YarnApplicationState.KILLED) {
-                                               LOG.warn("Application failed. 
Diagnostics " + appReport.getDiagnostics());
-                                               LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
-                                                               + "the full 
application log using this command:\n"
-                                                               + "\tyarn logs 
-applicationId " + appReport.getApplicationId() + "\n"
-                                                               + "(It 
sometimes takes a few seconds until the logs are aggregated)");
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Error while getting final 
application report", e);
-                               }
-                       }
-               });
-               actorRunner.setDaemon(true);
-               actorRunner.start();
-
-               pollingRunner = new PollingThread(yarnClient, appId);
-               pollingRunner.setDaemon(true);
-               pollingRunner.start();
-
-               Runtime.getRuntime().addShutdownHook(clientShutdownHook);
-
-               isConnected = true;
-       }
-
-       @Override
-       public void disconnect() {
-               if(!isConnected) {
-                       throw new IllegalStateException("Can not disconnect 
from an unconnected cluster.");
-               }
-               LOG.info("Disconnecting FlinkYarnCluster from 
ApplicationMaster");
-
-               
if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) {
-                       LOG.warn("Error while removing the shutdown hook. The 
YARN session might be killed unintentionally");
-               }
-               // tell the actor to shut down.
-               applicationClient.tell(PoisonPill.getInstance(), 
applicationClient);
-
-               try {
-                       actorRunner.join(1000); // wait for 1 second
-               } catch (InterruptedException e) {
-                       LOG.warn("Shutdown of the actor runner was 
interrupted", e);
-                       Thread.currentThread().interrupt();
-               }
-               try {
-                       pollingRunner.stopRunner();
-                       pollingRunner.join(1000);
-               } catch(InterruptedException e) {
-                       LOG.warn("Shutdown of the polling runner was 
interrupted", e);
-                       Thread.currentThread().interrupt();
-               }
-               isConnected = false;
-       }
-
-
-       // -------------------------- Interaction with the cluster 
------------------------
-
-       /*
-        * This call blocks until the message has been recevied.
-        */
-       @Override
-       public void stopAfterJob(JobID jobID) {
-               Preconditions.checkNotNull(jobID, "The job id must not be 
null");
-               Future<Object> messageReceived = ask(applicationClient, new 
YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
-               try {
-                       Await.result(messageReceived, akkaDuration);
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to tell application 
master to stop once the specified job has been finised", e);
-               }
-       }
-
-       @Override
-       public org.apache.flink.configuration.Configuration 
getFlinkConfiguration() {
-               return flinkConfig;
-       }
-
-       @Override
-       public InetSocketAddress getJobManagerAddress() {
-               return jobManagerAddress;
-       }
-
-       @Override
-       public String getWebInterfaceURL() {
-               String url = this.intialAppReport.getTrackingUrl();
-               // there seems to be a difference between HD 2.2.0 and 2.6.0
-               if(!url.startsWith("http://";)) {
-                       url = "http://"; + url;
-               }
-               return url;
-       }
-
-       @Override
-       public String getApplicationId() {
-               return applicationId.toString();
-       }
-
-       @Override
-       public boolean isDetached() {
-               return this.detached;
-       }
-
-       /**
-        * This method is only available if the cluster hasn't been started in 
detached mode.
-        */
-       @Override
-       public GetClusterStatusResponse getClusterStatus() {
-               if(!isConnected) {
-                       throw new IllegalStateException("The cluster is not 
connected to the ApplicationMaster.");
-               }
-               if(hasBeenStopped()) {
-                       throw new RuntimeException("The FlinkYarnCluster has 
already been stopped");
-               }
-               Future<Object> clusterStatusOption = ask(applicationClient, 
YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
-               Object clusterStatus;
-               try {
-                       clusterStatus = Await.result(clusterStatusOption, 
akkaDuration);
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to get Cluster 
status from Application Client", e);
-               }
-               if(clusterStatus instanceof None$) {
-                       return null;
-               } else if(clusterStatus instanceof Some) {
-                       return (GetClusterStatusResponse) (((Some) 
clusterStatus).get());
-               } else {
-                       throw new RuntimeException("Unexpected type: " + 
clusterStatus.getClass().getCanonicalName());
-               }
-       }
-
-       @Override
-       public boolean hasFailed() {
-               if(!isConnected) {
-                       throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
-               }
-               if(pollingRunner == null) {
-                       LOG.warn("FlinkYarnCluster.hasFailed() has been called 
on an uninitialized cluster." +
-                                       "The system might be in an erroneous 
state");
-               }
-               ApplicationReport lastReport = pollingRunner.getLastReport();
-               if(lastReport == null) {
-                       LOG.warn("FlinkYarnCluster.hasFailed() has been called 
on a cluster that didn't receive a status so far." +
-                                       "The system might be in an erroneous 
state");
-                       return false;
-               } else {
-                       YarnApplicationState appState = 
lastReport.getYarnApplicationState();
-                       boolean status = (appState == 
YarnApplicationState.FAILED ||
-                                       appState == 
YarnApplicationState.KILLED);
-                       if(status) {
-                               LOG.warn("YARN reported application state {}", 
appState);
-                               LOG.warn("Diagnostics: {}", 
lastReport.getDiagnostics());
-                       }
-                       return status;
-               }
-       }
-
-
-       @Override
-       public String getDiagnostics() {
-               if(!isConnected) {
-                       throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
-               }
-
-               if (!hasFailed()) {
-                       LOG.warn("getDiagnostics() called for cluster which is 
not in failed state");
-               }
-               ApplicationReport lastReport = pollingRunner.getLastReport();
-               if (lastReport == null) {
-                       LOG.warn("Last report is null");
-                       return null;
-               } else {
-                       return lastReport.getDiagnostics();
-               }
-       }
-
-       @Override
-       public List<String> getNewMessages() {
-               if(!isConnected) {
-                       throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
-               }
-
-               if(hasBeenStopped()) {
-                       throw new RuntimeException("The FlinkYarnCluster has 
already been stopped");
-               }
-               List<String> ret = new ArrayList<String>();
-
-               // get messages from ApplicationClient (locally)
-               while(true) {
-                       Object result;
-                       try {
-                               Future<Object> response = 
Patterns.ask(applicationClient,
-                                               
YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
-
-                               result = Await.result(response, akkaDuration);
-                       } catch(Exception ioe) {
-                               LOG.warn("Error retrieving the YARN messages 
locally", ioe);
-                               break;
-                       }
-
-                       if(!(result instanceof Option)) {
-                               throw new RuntimeException("LocalGetYarnMessage 
requires a response of type " +
-                                               "Option. Instead the response 
is of type " + result.getClass() + ".");
-                       } else {
-                               Option messageOption = (Option) result;
-                               LOG.debug("Received message option {}", 
messageOption);
-                               if(messageOption.isEmpty()) {
-                                       break;
-                               } else {
-                                       Object obj = messageOption.get();
-
-                                       if(obj instanceof InfoMessage) {
-                                               InfoMessage msg = (InfoMessage) 
obj;
-                                               ret.add("[" + msg.date() + "] " 
+ msg.message());
-                                       } else {
-                                               LOG.warn("LocalGetYarnMessage 
returned unexpected type: " + messageOption);
-                                       }
-                               }
-                       }
-               }
-               return ret;
-       }
-
-       // -------------------------- Shutdown handling ------------------------
-
-       private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
-
-       /**
-        * Shutdown the YARN cluster.
-        * @param failApplication whether we should fail the YARN application 
(in case of errors in Flink)
-        */
-       @Override
-       public void shutdown(boolean failApplication) {
-               if(!isConnected) {
-                       throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
-               }
-
-               if(hasBeenShutDown.getAndSet(true)) {
-                       return;
-               }
-
-               try {
-                       
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
-               } catch (IllegalStateException e) {
-                       // we are already in the shutdown hook
-               }
-
-               if(actorSystem != null){
-                       LOG.info("Sending shutdown request to the Application 
Master");
-                       if(applicationClient != ActorRef.noSender()) {
-                               try {
-                                       FinalApplicationStatus finalStatus;
-                                       if (failApplication) {
-                                               finalStatus = 
FinalApplicationStatus.FAILED;
-                                       } else {
-                                               finalStatus = 
FinalApplicationStatus.SUCCEEDED;
-                                       }
-                                       Future<Object> response = 
Patterns.ask(applicationClient,
-                                                       new 
YarnMessages.LocalStopYarnSession(finalStatus,
-                                                                       "Flink 
YARN Client requested shutdown"),
-                                                       new 
Timeout(akkaDuration));
-                                       Await.ready(response, akkaDuration);
-                               } catch(Exception e) {
-                                       LOG.warn("Error while stopping YARN 
Application Client", e);
-                               }
-                       }
-
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
-
-                       actorSystem = null;
-               }
-
-               LOG.info("Deleting files in " + sessionFilesDir );
-               try {
-                       FileSystem shutFS = FileSystem.get(hadoopConfig);
-                       shutFS.delete(sessionFilesDir, true); // delete conf 
and jar file.
-                       shutFS.close();
-               }catch(IOException e){
-                       LOG.error("Could not delete the Flink jar and 
configuration files in HDFS..", e);
-               }
-
-               try {
-                       actorRunner.join(1000); // wait for 1 second
-               } catch (InterruptedException e) {
-                       LOG.warn("Shutdown of the actor runner was 
interrupted", e);
-                       Thread.currentThread().interrupt();
-               }
-               try {
-                       pollingRunner.stopRunner();
-                       pollingRunner.join(1000);
-               } catch(InterruptedException e) {
-                       LOG.warn("Shutdown of the polling runner was 
interrupted", e);
-                       Thread.currentThread().interrupt();
-               }
-
-               LOG.info("YARN Client is shutting down");
-               yarnClient.stop(); // actorRunner is using the yarnClient.
-               yarnClient = null; // set null to clearly see if somebody wants 
to access it afterwards.
-       }
-
-       @Override
-       public boolean hasBeenStopped() {
-               return hasBeenShutDown.get();
-       }
-
-
-       public class ClientShutdownHook extends Thread {
-               @Override
-               public void run() {
-                       LOG.info("Shutting down FlinkYarnCluster from the 
client shutdown hook");
-                       shutdown(false);
-               }
-       }
-
-       // -------------------------- Polling ------------------------
-
-       public static class PollingThread extends Thread {
-
-               AtomicBoolean running = new AtomicBoolean(true);
-               private YarnClient yarnClient;
-               private ApplicationId appId;
-
-               // ------- status information stored in the polling thread
-               private final Object lock = new Object();
-               private ApplicationReport lastReport;
-
-
-               public PollingThread(YarnClient yarnClient, ApplicationId 
appId) {
-                       this.yarnClient = yarnClient;
-                       this.appId = appId;
-               }
-
-               public void stopRunner() {
-                       if(!running.get()) {
-                               LOG.warn("Polling thread was already stopped");
-                       }
-                       running.set(false);
-               }
-
-               public ApplicationReport getLastReport() {
-                       synchronized (lock) {
-                               return lastReport;
-                       }
-               }
-
-               @Override
-               public void run() {
-                       while (running.get() && 
yarnClient.isInState(Service.STATE.STARTED)) {
-                               try {
-                                       ApplicationReport report = 
yarnClient.getApplicationReport(appId);
-                                       synchronized (lock) {
-                                               lastReport = report;
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Error while getting 
application report", e);
-                               }
-                               try {
-                                       
Thread.sleep(FlinkYarnCluster.POLLING_THREAD_INTERVAL_MS);
-                               } catch (InterruptedException e) {
-                                       LOG.error("Polling thread got 
interrupted", e);
-                                       Thread.currentThread().interrupt(); // 
pass interrupt.
-                               }
-                       }
-                       if(running.get() && 
!yarnClient.isInState(Service.STATE.STARTED)) {
-                               // == if the polling thread is still running 
but the yarn client is stopped.
-                               LOG.warn("YARN client is unexpected in state " 
+ yarnClient.getServiceState());
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 1d0afc4..24b5a35 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -22,7 +22,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
 
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -39,6 +38,7 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -192,7 +192,7 @@ public class YarnApplicationMasterRunner {
 
                        // Flink configuration
                        final Map<String, String> dynamicProperties =
-                               
CliFrontend.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
+                               
FlinkYarnSessionCli.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
                        LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
 
                        final Configuration config = 
createConfiguration(currDir, dynamicProperties);
@@ -292,8 +292,7 @@ public class YarnApplicationMasterRunner {
                        // 3: Flink's Yarn ResourceManager
                        LOG.debug("Starting YARN Flink Resource Manager");
 
-                       // we need the leader retrieval service here to be 
informed of new
-                       // leader session IDs, even though there can be only 
one leader ever
+                       // we need the leader retrieval service here to be 
informed of new leaders and session IDs
                        LeaderRetrievalService leaderRetriever = 
                                
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
new file mode 100644
index 0000000..a5b8af7
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+
+import static akka.pattern.Patterns.ask;
+
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Option;
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Java representation of a running Flink cluster within YARN.
+ */
+public class YarnClusterClient extends ClusterClient {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(YarnClusterClient.class);
+
+       private static final int POLLING_THREAD_INTERVAL_MS = 1000;
+
+       private YarnClient yarnClient;
+       private Thread actorRunner;
+       private Thread clientShutdownHook = new ClientShutdownHook();
+       private PollingThread pollingRunner;
+       private final Configuration hadoopConfig;
+       // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
+       private final Path sessionFilesDir;
+
+       /** The leader retrieval service for connecting to the cluster and 
finding the active leader. */
+       private final LeaderRetrievalService leaderRetrievalService;
+
+       //---------- Class internal fields -------------------
+
+       private final AbstractYarnClusterDescriptor clusterDescriptor;
+       private final ActorRef applicationClient;
+       private final FiniteDuration akkaDuration;
+       private final Timeout akkaTimeout;
+       private final ApplicationReport applicationId;
+       private final ApplicationId appId;
+       private final String trackingURL;
+
+       private boolean isConnected = false;
+
+
+       /**
+        * Create a new Flink on YARN cluster.
+        *
+        * @param clusterDescriptor The descriptor used at cluster creation
+        * @param yarnClient Client to talk to YARN
+        * @param appReport the YARN application ID
+        * @param flinkConfig Flink configuration
+        * @param sessionFilesDir Location of files required for YARN session
+        * @throws IOException
+        * @throws YarnException
+        */
+       public YarnClusterClient(
+               final AbstractYarnClusterDescriptor clusterDescriptor,
+               final YarnClient yarnClient,
+               final ApplicationReport appReport,
+               org.apache.flink.configuration.Configuration flinkConfig,
+               Path sessionFilesDir) throws IOException, YarnException {
+
+               super(flinkConfig);
+
+               this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
+               this.akkaTimeout = Timeout.durationToTimeout(akkaDuration);
+               this.clusterDescriptor = clusterDescriptor;
+               this.yarnClient = yarnClient;
+               this.hadoopConfig = yarnClient.getConfig();
+               this.sessionFilesDir = sessionFilesDir;
+               this.applicationId = appReport;
+               this.appId = appReport.getApplicationId();
+               this.trackingURL = appReport.getTrackingUrl();
+
+               try {
+                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+               } catch (Exception e) {
+                       throw new IOException("Could not create the leader 
retrieval service.", e);
+               }
+
+
+               if (isConnected) {
+                       throw new IllegalStateException("Already connected to 
the cluster.");
+               }
+
+               // start application client
+               LOG.info("Start application client.");
+
+               applicationClient = actorSystem.actorOf(
+                       Props.create(
+                               ApplicationClient.class,
+                               flinkConfig,
+                               leaderRetrievalService),
+                       "applicationClient");
+
+               actorRunner = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               // blocks until ApplicationClient has been 
stopped
+                               actorSystem.awaitTermination();
+
+                               // get final application report
+                               try {
+                                       ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
+
+                                       LOG.info("Application " + appId + " 
finished with state " + appReport
+                                               .getYarnApplicationState() + " 
and final state " + appReport
+                                               .getFinalApplicationStatus() + 
" at " + appReport.getFinishTime());
+
+                                       if (appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+                                               == YarnApplicationState.KILLED) 
{
+                                               LOG.warn("Application failed. 
Diagnostics " + appReport.getDiagnostics());
+                                               LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
+                                                       + "the full application 
log using this command:\n"
+                                                       + "\tyarn logs 
-applicationId " + appReport.getApplicationId() + "\n"
+                                                       + "(It sometimes takes 
a few seconds until the logs are aggregated)");
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Error while getting final 
application report", e);
+                               }
+                       }
+               });
+               actorRunner.setDaemon(true);
+               actorRunner.start();
+
+               pollingRunner = new PollingThread(yarnClient, appId);
+               pollingRunner.setDaemon(true);
+               pollingRunner.start();
+
+               Runtime.getRuntime().addShutdownHook(clientShutdownHook);
+
+               isConnected = true;
+
+               logAndSysout("Waiting until all TaskManagers have connected");
+
+               while(true) {
+                       GetClusterStatusResponse status = getClusterStatus();
+                       if (status != null) {
+                               if (status.numRegisteredTaskManagers() < 
clusterDescriptor.getTaskManagerCount()) {
+                                       logAndSysout("TaskManager status (" + 
status.numRegisteredTaskManagers() + "/"
+                                               + 
clusterDescriptor.getTaskManagerCount() + ")");
+                               } else {
+                                       logAndSysout("All TaskManagers are 
connected");
+                                       break;
+                               }
+                       } else {
+                               logAndSysout("No status updates from the YARN 
cluster received so far. Waiting ...");
+                       }
+
+                       try {
+                               Thread.sleep(500);
+                       } catch (InterruptedException e) {
+                               LOG.error("Interrupted while waiting for 
TaskManagers");
+                               System.err.println("Thread is interrupted");
+                               throw new IOException("Interrupted while 
waiting for TaskManagers", e);
+                       }
+               }
+       }
+
+       public void disconnect() {
+               if(!isConnected) {
+                       throw new IllegalStateException("Can not disconnect 
from an unconnected cluster.");
+               }
+               LOG.info("Disconnecting YarnClusterClient from 
ApplicationMaster");
+
+               
if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) {
+                       LOG.warn("Error while removing the shutdown hook. The 
YARN session might be killed unintentionally");
+               }
+               // tell the actor to shut down.
+               applicationClient.tell(PoisonPill.getInstance(), 
applicationClient);
+
+               try {
+                       actorRunner.join(1000); // wait for 1 second
+               } catch (InterruptedException e) {
+                       LOG.warn("Shutdown of the actor runner was 
interrupted", e);
+                       Thread.currentThread().interrupt();
+               }
+               try {
+                       pollingRunner.stopRunner();
+                       pollingRunner.join(1000);
+               } catch(InterruptedException e) {
+                       LOG.warn("Shutdown of the polling runner was 
interrupted", e);
+                       Thread.currentThread().interrupt();
+               }
+               isConnected = false;
+       }
+
+
+       // -------------------------- Interaction with the cluster 
------------------------
+
+       /*
+        * Tells the Cluster to monitor the status of JobId and stop itself 
once the specified job has finished.
+        */
+       private void stopAfterJob(JobID jobID) {
+               Preconditions.checkNotNull(jobID, "The job id must not be 
null");
+               Future<Object> messageReceived = ask(applicationClient, new 
YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
+               try {
+                       Await.result(messageReceived, akkaDuration);
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to tell application 
master to stop once the specified job has been finised", e);
+               }
+       }
+
+       @Override
+       public org.apache.flink.configuration.Configuration 
getFlinkConfiguration() {
+               return flinkConfig;
+       }
+
+       @Override
+       public int getMaxSlots() {
+               int maxSlots = clusterDescriptor.getTaskManagerCount() * 
clusterDescriptor.getTaskManagerSlots();
+               return maxSlots > 0 ? maxSlots : -1;
+       }
+
+       @Override
+       protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               if (isDetached()) {
+                       JobSubmissionResult result = 
super.runDetached(jobGraph, classLoader);
+                       stopAfterJob(jobGraph.getJobID());
+                       return result;
+               } else {
+                       return super.run(jobGraph, classLoader);
+               }
+       }
+
+       @Override
+       public String getWebInterfaceURL() {
+               // there seems to be a difference between HD 2.2.0 and 2.6.0
+               if(!trackingURL.startsWith("http://";)) {
+                       return "http://"; + trackingURL;
+               } else {
+                       return trackingURL;
+               }
+       }
+
+       @Override
+       public String getClusterIdentifier() {
+               return applicationId.getApplicationId().toString();
+       }
+
+       /**
+        * This method is only available if the cluster hasn't been started in 
detached mode.
+        */
+       @Override
+       public GetClusterStatusResponse getClusterStatus() {
+               if(!isConnected) {
+                       throw new IllegalStateException("The cluster is not 
connected to the ApplicationMaster.");
+               }
+               if(hasBeenShutdown()) {
+                       throw new RuntimeException("The YarnClusterClient has 
already been stopped");
+               }
+               Future<Object> clusterStatusOption = ask(applicationClient, 
YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
+               Object clusterStatus;
+               try {
+                       clusterStatus = Await.result(clusterStatusOption, 
akkaDuration);
+               } catch (Exception e) {
+                       throw new RuntimeException("Unable to get ClusterClient 
status from Application Client", e);
+               }
+               if(clusterStatus instanceof None$) {
+                       return null;
+               } else if(clusterStatus instanceof Some) {
+                       return (GetClusterStatusResponse) (((Some) 
clusterStatus).get());
+               } else {
+                       throw new RuntimeException("Unexpected type: " + 
clusterStatus.getClass().getCanonicalName());
+               }
+       }
+
+       public ApplicationStatus getApplicationStatus() {
+               if(!isConnected) {
+                       throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
+               }
+               if(pollingRunner == null) {
+                       LOG.warn("YarnClusterClient.getApplicationStatus() has 
been called on an uninitialized cluster." +
+                                       "The system might be in an erroneous 
state");
+               }
+               ApplicationReport lastReport = pollingRunner.getLastReport();
+               if(lastReport == null) {
+                       LOG.warn("YarnClusterClient.getApplicationStatus() has 
been called on a cluster that didn't receive a status so far." +
+                                       "The system might be in an erroneous 
state");
+                       return ApplicationStatus.UNKNOWN;
+               } else {
+                       YarnApplicationState appState = 
lastReport.getYarnApplicationState();
+                       ApplicationStatus status =
+                               (appState == YarnApplicationState.FAILED || 
appState == YarnApplicationState.KILLED) ?
+                                       ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;
+                       if(status != ApplicationStatus.SUCCEEDED) {
+                               LOG.warn("YARN reported application state {}", 
appState);
+                               LOG.warn("Diagnostics: {}", 
lastReport.getDiagnostics());
+                       }
+                       return status;
+               }
+       }
+
+
+       private String getDiagnostics() {
+               if(!isConnected) {
+                       throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
+               }
+
+               if (getApplicationStatus() == ApplicationStatus.SUCCEEDED) {
+                       LOG.warn("getDiagnostics() called for cluster which is 
not in failed state");
+               }
+               ApplicationReport lastReport = pollingRunner.getLastReport();
+               if (lastReport == null) {
+                       LOG.warn("Last report is null");
+                       return null;
+               } else {
+                       return lastReport.getDiagnostics();
+               }
+       }
+
+       @Override
+       public List<String> getNewMessages() {
+               if(!isConnected) {
+                       throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
+               }
+
+               if(hasBeenShutdown()) {
+                       throw new RuntimeException("The YarnClusterClient has 
already been stopped");
+               }
+               List<String> ret = new ArrayList<String>();
+
+               // get messages from ApplicationClient (locally)
+               while(true) {
+                       Object result;
+                       try {
+                               Future<Object> response = 
Patterns.ask(applicationClient,
+                                               
YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
+
+                               result = Await.result(response, akkaDuration);
+                       } catch(Exception ioe) {
+                               LOG.warn("Error retrieving the YARN messages 
locally", ioe);
+                               break;
+                       }
+
+                       if(!(result instanceof Option)) {
+                               throw new RuntimeException("LocalGetYarnMessage 
requires a response of type " +
+                                               "Option. Instead the response 
is of type " + result.getClass() + ".");
+                       } else {
+                               Option messageOption = (Option) result;
+                               LOG.debug("Received message option {}", 
messageOption);
+                               if(messageOption.isEmpty()) {
+                                       break;
+                               } else {
+                                       Object obj = messageOption.get();
+
+                                       if(obj instanceof InfoMessage) {
+                                               InfoMessage msg = (InfoMessage) 
obj;
+                                               ret.add("[" + msg.date() + "] " 
+ msg.message());
+                                       } else {
+                                               LOG.warn("LocalGetYarnMessage 
returned unexpected type: " + messageOption);
+                                       }
+                               }
+                       }
+               }
+               return ret;
+       }
+
+       // -------------------------- Shutdown handling ------------------------
+
+       private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
+
+       /**
+        * Shuts down or disconnects from the YARN cluster.
+        */
+       @Override
+       public void finalizeCluster() {
+
+               if (!isConnected) {
+                       throw new IllegalStateException("The cluster has been 
not been connected to the ApplicationMaster.");
+               }
+
+               if (isDetached()) {
+                       // only disconnect if we are running detached
+                       disconnect();
+                       return;
+               }
+
+               // show cluster status
+
+               List<String> msgs = getNewMessages();
+               if (msgs != null && msgs.size() > 1) {
+
+                       logAndSysout("The following messages were created by 
the YARN cluster while running the Job:");
+                       for (String msg : msgs) {
+                               logAndSysout(msg);
+                       }
+               }
+               if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
+                       logAndSysout("YARN cluster is in non-successful state " 
+ getApplicationStatus());
+                       logAndSysout("YARN Diagnostics: " + getDiagnostics());
+               }
+
+
+               if(hasBeenShutDown.getAndSet(true)) {
+                       return;
+               }
+
+               try {
+                       
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+               } catch (IllegalStateException e) {
+                       // we are already in the shutdown hook
+               }
+
+               if(actorSystem != null){
+                       LOG.info("Sending shutdown request to the Application 
Master");
+                       if(applicationClient != ActorRef.noSender()) {
+                               try {
+                                       Future<Object> response = 
Patterns.ask(applicationClient,
+                                                       new 
YarnMessages.LocalStopYarnSession(getApplicationStatus(),
+                                                                       "Flink 
YARN Client requested shutdown"),
+                                                       new 
Timeout(akkaDuration));
+                                       Await.ready(response, akkaDuration);
+                               } catch(Exception e) {
+                                       LOG.warn("Error while stopping YARN 
Application Client", e);
+                               }
+                       }
+
+                       actorSystem.shutdown();
+                       actorSystem.awaitTermination();
+               }
+
+               LOG.info("Deleting files in " + sessionFilesDir);
+               try {
+                       FileSystem shutFS = FileSystem.get(hadoopConfig);
+                       shutFS.delete(sessionFilesDir, true); // delete conf 
and jar file.
+                       shutFS.close();
+               }catch(IOException e){
+                       LOG.error("Could not delete the Flink jar and 
configuration files in HDFS..", e);
+               }
+
+               try {
+                       actorRunner.join(1000); // wait for 1 second
+               } catch (InterruptedException e) {
+                       LOG.warn("Shutdown of the actor runner was 
interrupted", e);
+                       Thread.currentThread().interrupt();
+               }
+               try {
+                       pollingRunner.stopRunner();
+                       pollingRunner.join(1000);
+               } catch(InterruptedException e) {
+                       LOG.warn("Shutdown of the polling runner was 
interrupted", e);
+                       Thread.currentThread().interrupt();
+               }
+
+               LOG.info("YARN Client is shutting down");
+               yarnClient.stop(); // actorRunner is using the yarnClient.
+               yarnClient = null; // set null to clearly see if somebody wants 
to access it afterwards.
+       }
+
+       public boolean hasBeenShutdown() {
+               return hasBeenShutDown.get();
+       }
+
+
+       private class ClientShutdownHook extends Thread {
+               @Override
+               public void run() {
+                       LOG.info("Shutting down YarnClusterClient from the 
client shutdown hook");
+                       shutdown();
+               }
+       }
+
+       // -------------------------- Polling ------------------------
+
+       private static class PollingThread extends Thread {
+
+               AtomicBoolean running = new AtomicBoolean(true);
+               private YarnClient yarnClient;
+               private ApplicationId appId;
+
+               // ------- status information stored in the polling thread
+               private final Object lock = new Object();
+               private ApplicationReport lastReport;
+
+
+               public PollingThread(YarnClient yarnClient, ApplicationId 
appId) {
+                       this.yarnClient = yarnClient;
+                       this.appId = appId;
+               }
+
+               public void stopRunner() {
+                       if(!running.get()) {
+                               LOG.warn("Polling thread was already stopped");
+                       }
+                       running.set(false);
+               }
+
+               public ApplicationReport getLastReport() {
+                       synchronized (lock) {
+                               return lastReport;
+                       }
+               }
+
+               @Override
+               public void run() {
+                       while (running.get() && 
yarnClient.isInState(Service.STATE.STARTED)) {
+                               try {
+                                       ApplicationReport report = 
yarnClient.getApplicationReport(appId);
+                                       synchronized (lock) {
+                                               lastReport = report;
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Error while getting 
application report", e);
+                               }
+                               try {
+                                       
Thread.sleep(YarnClusterClient.POLLING_THREAD_INTERVAL_MS);
+                               } catch (InterruptedException e) {
+                                       LOG.error("Polling thread got 
interrupted", e);
+                                       Thread.currentThread().interrupt(); // 
pass interrupt.
+                                       stopRunner();
+                               }
+                       }
+                       if(running.get() && 
!yarnClient.isInState(Service.STATE.STARTED)) {
+                               // == if the polling thread is still running 
but the yarn client is stopped.
+                               LOG.warn("YARN client is unexpected in state " 
+ yarnClient.getServiceState());
+                       }
+               }
+       }
+
+       @Override
+       public boolean isDetached() {
+               // either we have set detached mode using the general '-d' flag 
or using the Yarn CLI flag 'yd'
+               return super.isDetached() || clusterDescriptor.isDetachedMode();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
new file mode 100644
index 0000000..43e7c7b
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+/**
+ * Default implementation of {@link AbstractYarnClusterDescriptor} which 
starts an {@link YarnApplicationMasterRunner}.
+ */
+public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+       @Override
+       protected Class<?> getApplicationMasterClass() {
+               return YarnApplicationMasterRunner.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
new file mode 100644
index 0000000..a2375c5
--- /dev/null
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.YarnClusterClient;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Class handling the command line interface to the YARN session.
+ */
+public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnSessionCli.class);
+
+       //------------------------------------ Constants   
-------------------------
+
+       public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
+       public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
+
+       private static final int CLIENT_POLLING_INTERVALL = 3;
+
+       /** The id for the CommandLine interface */
+       private static final String ID = "yarn-cluster";
+
+       // YARN-session related constants
+       private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
+       private static final String YARN_PROPERTIES_JOBMANAGER_KEY = 
"jobManager";
+       private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
+       private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = 
"dynamicPropertiesString";
+
+       private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; 
// this has to be a regex for String.split()
+
+       //------------------------------------ Command Line argument options 
-------------------------
+       // the prefix transformation is used by the CliFrontend static 
constructor.
+       private final Option QUERY;
+       // --- or ---
+       private final Option QUEUE;
+       private final Option SHIP_PATH;
+       private final Option FLINK_JAR;
+       private final Option JM_MEMORY;
+       private final Option TM_MEMORY;
+       private final Option CONTAINER;
+       private final Option SLOTS;
+       private final Option DETACHED;
+       private final Option STREAMING;
+       private final Option NAME;
+
+       /**
+        * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
+        *  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+        */
+       private final Option DYNAMIC_PROPERTIES;
+
+       private final boolean acceptInteractiveInput;
+       
+       //------------------------------------ Internal fields 
-------------------------
+       private YarnClusterClient yarnCluster = null;
+       private boolean detachedMode = false;
+
+       public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
+               this(shortPrefix, longPrefix, true);
+       }
+
+       public FlinkYarnSessionCli(String shortPrefix, String longPrefix, 
boolean acceptInteractiveInput) {
+               this.acceptInteractiveInput = acceptInteractiveInput;
+               
+               QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
+               QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
+               SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
+               FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
+               JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + 
"jobManagerMemory", true, "Memory for JobManager Container [in MB]");
+               TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + 
"taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
+               CONTAINER = new Option(shortPrefix + "n", longPrefix + 
"container", true, "Number of YARN container to allocate (=Number of Task 
Managers)");
+               SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", 
true, "Number of slots per TaskManager");
+               DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, 
"Dynamic properties");
+               DETACHED = new Option(shortPrefix + "d", longPrefix + 
"detached", false, "Start detached");
+               STREAMING = new Option(shortPrefix + "st", longPrefix + 
"streaming", false, "Start Flink in streaming mode");
+               NAME = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
+       }
+
+       /**
+        * Resumes from a Flink Yarn properties file
+        * @param flinkConfiguration The flink configuration
+        * @return True if the properties were loaded, false otherwise
+        */
+       private boolean resumeFromYarnProperties(Configuration 
flinkConfiguration) {
+               // load the YARN properties
+               File propertiesFile = new 
File(getYarnPropertiesLocation(flinkConfiguration));
+               if (!propertiesFile.exists()) {
+                       return false;
+               }
+
+               logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
+
+               Properties yarnProperties = new Properties();
+               try {
+                       try (InputStream is = new 
FileInputStream(propertiesFile)) {
+                               yarnProperties.load(is);
+                       }
+               }
+               catch (IOException e) {
+                       throw new RuntimeException("Cannot read the YARN 
properties file", e);
+               }
+
+               // configure the default parallelism from YARN
+               String propParallelism = 
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
+               if (propParallelism != null) { // maybe the property is not set
+                       try {
+                               int parallelism = 
Integer.parseInt(propParallelism);
+                               
flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 
parallelism);
+
+                               logAndSysout("YARN properties set default 
parallelism to " + parallelism);
+                       }
+                       catch (NumberFormatException e) {
+                               throw new RuntimeException("Error while parsing 
the YARN properties: " +
+                                       "Property " + 
YARN_PROPERTIES_PARALLELISM + " is not an integer.");
+                       }
+               }
+
+               // get the JobManager address from the YARN properties
+               String address = 
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+               InetSocketAddress jobManagerAddress;
+               if (address != null) {
+                       try {
+                               jobManagerAddress = 
ClientUtils.parseHostPortAddress(address);
+                               // store address in config from where it is 
retrieved by the retrieval service
+                               
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, 
jobManagerAddress);
+                       }
+                       catch (Exception e) {
+                               throw new RuntimeException("YARN properties 
contain an invalid entry for JobManager address.", e);
+                       }
+
+                       logAndSysout("Using JobManager address from YARN 
properties " + jobManagerAddress);
+               }
+
+               // handle the YARN client's dynamic properties
+               String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+               Map<String, String> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
+               for (Map.Entry<String, String> dynamicProperty : 
dynamicProperties.entrySet()) {
+                       flinkConfiguration.setString(dynamicProperty.getKey(), 
dynamicProperty.getValue());
+               }
+
+               return true;
+       }
+
+       public YarnClusterDescriptor createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
+
+
+               YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor();
+
+               if (!cmd.hasOption(CONTAINER.getOpt())) { // number of 
containers is required option!
+                       LOG.error("Missing required argument {}", 
CONTAINER.getOpt());
+                       printUsage();
+                       throw new IllegalArgumentException("Missing required 
argument " + CONTAINER.getOpt());
+               }
+               
yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
+
+               // Jar Path
+               Path localJarPath;
+               if (cmd.hasOption(FLINK_JAR.getOpt())) {
+                       String userPath = 
cmd.getOptionValue(FLINK_JAR.getOpt());
+                       if(!userPath.startsWith("file://")) {
+                               userPath = "file://" + userPath;
+                       }
+                       localJarPath = new Path(userPath);
+               } else {
+                       LOG.info("No path for the flink jar passed. Using the 
location of "
+                               + yarnClusterDescriptor.getClass() + " to 
locate the jar");
+                       localJarPath = new Path("file://" +
+                               
yarnClusterDescriptor.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
+               }
+
+               yarnClusterDescriptor.setLocalJarPath(localJarPath);
+
+               List<File> shipFiles = new ArrayList<>();
+               // path to directory to ship
+               if (cmd.hasOption(SHIP_PATH.getOpt())) {
+                       String shipPath = 
cmd.getOptionValue(SHIP_PATH.getOpt());
+                       File shipDir = new File(shipPath);
+                       if (shipDir.isDirectory()) {
+                               shipFiles = new 
ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
+                                       @Override
+                                       public boolean accept(File dir, String 
name) {
+                                               return !(name.equals(".") || 
name.equals(".."));
+                                       }
+                               })));
+                       } else {
+                               LOG.warn("Ship directory is not a directory. 
Ignoring it.");
+                       }
+               }
+
+               yarnClusterDescriptor.setShipFiles(shipFiles);
+
+               // queue
+               if (cmd.hasOption(QUEUE.getOpt())) {
+                       
yarnClusterDescriptor.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
+               }
+
+               // JobManager Memory
+               if (cmd.hasOption(JM_MEMORY.getOpt())) {
+                       int jmMemory = 
Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
+                       yarnClusterDescriptor.setJobManagerMemory(jmMemory);
+               }
+
+               // Task Managers memory
+               if (cmd.hasOption(TM_MEMORY.getOpt())) {
+                       int tmMemory = 
Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
+                       yarnClusterDescriptor.setTaskManagerMemory(tmMemory);
+               }
+
+               if (cmd.hasOption(SLOTS.getOpt())) {
+                       int slots = 
Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
+                       yarnClusterDescriptor.setTaskManagerSlots(slots);
+               }
+
+               String[] dynamicProperties = null;
+               if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+                       dynamicProperties = 
cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+               }
+               String dynamicPropertiesEncoded = 
StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+
+               
yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
+
+               if (cmd.hasOption(DETACHED.getOpt()) || 
cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
+                       this.detachedMode = true;
+                       yarnClusterDescriptor.setDetachedMode(true);
+               }
+
+               if(cmd.hasOption(NAME.getOpt())) {
+                       
yarnClusterDescriptor.setName(cmd.getOptionValue(NAME.getOpt()));
+               } else {
+                       // set the default application name, if none is 
specified
+                       if(defaultApplicationName != null) {
+                               
yarnClusterDescriptor.setName(defaultApplicationName);
+                       }
+               }
+
+               // ----- Convenience -----
+
+               // the number of slots available from YARN:
+               int yarnTmSlots = yarnClusterDescriptor.getTaskManagerSlots();
+               if (yarnTmSlots == -1) {
+                       yarnTmSlots = 1;
+               }
+
+               int maxSlots = yarnTmSlots * 
yarnClusterDescriptor.getTaskManagerCount();
+               int userParallelism = 
Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(),
 "-1"));
+               if (userParallelism != -1) {
+                       int slotsPerTM = userParallelism / 
yarnClusterDescriptor.getTaskManagerCount();
+                       String message = "The YARN cluster has " + maxSlots + " 
slots available, " +
+                               "but the user requested a parallelism of " + 
userParallelism + " on YARN. " +
+                               "Each of the " + 
yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " +
+                               "will get "+slotsPerTM+" slots.";
+                       logAndSysout(message);
+                       yarnClusterDescriptor.setTaskManagerSlots(slotsPerTM);
+               }
+
+               return yarnClusterDescriptor;
+       }
+
+       @Override
+       public YarnClusterClient createClient(String applicationName, 
CommandLine cmdLine) throws Exception {
+
+               YarnClusterDescriptor yarnClusterDescriptor = 
createDescriptor(applicationName, cmdLine);
+
+               try {
+                       return yarnClusterDescriptor.deploy();
+               } catch (Exception e) {
+                       throw new RuntimeException("Error deploying the YARN 
cluster", e);
+               }
+
+       }
+
+       private void printUsage() {
+               System.out.println("Usage:");
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setWidth(200);
+               formatter.setLeftPadding(5);
+               formatter.setSyntaxPrefix("   Required");
+               Options req = new Options();
+               req.addOption(CONTAINER);
+               formatter.printHelp(" ", req);
+
+               formatter.setSyntaxPrefix("   Optional");
+               Options opt = new Options();
+               opt.addOption(JM_MEMORY);
+               opt.addOption(TM_MEMORY);
+               opt.addOption(QUERY);
+               opt.addOption(QUEUE);
+               opt.addOption(SLOTS);
+               opt.addOption(DYNAMIC_PROPERTIES);
+               opt.addOption(DETACHED);
+               opt.addOption(STREAMING);
+               opt.addOption(NAME);
+               formatter.printHelp(" ", opt);
+       }
+
+       private static void writeYarnProperties(Properties properties, File 
propertiesFile) {
+               try {
+                       OutputStream out = new FileOutputStream(propertiesFile);
+                       properties.store(out, "Generated YARN properties file");
+                       out.close();
+               } catch (IOException e) {
+                       throw new RuntimeException("Error writing the 
properties file", e);
+               }
+               propertiesFile.setReadable(true, false); // readable for all.
+       }
+
+       public static void runInteractiveCli(YarnClusterClient yarnCluster, 
boolean readConsoleInput) {
+               final String HELP = "Available commands:\n" +
+                               "help - show these commands\n" +
+                               "stop - stop the YARN session";
+               int numTaskmanagers = 0;
+               try {
+                       BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in));
+                       label:
+                       while (true) {
+                               // ------------------ check if there are 
updates by the cluster -----------
+
+                               GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
+                               LOG.debug("Received status message: {}", 
status);
+
+                               if (status != null && numTaskmanagers != 
status.numRegisteredTaskManagers()) {
+                                       System.err.println("Number of connected 
TaskManagers changed to " +
+                                                       
status.numRegisteredTaskManagers() + ". " +
+                                               "Slots available: " + 
status.totalNumberOfSlots());
+                                       numTaskmanagers = 
status.numRegisteredTaskManagers();
+                               }
+
+                               List<String> messages = 
yarnCluster.getNewMessages();
+                               if (messages != null && messages.size() > 0) {
+                                       System.err.println("New messages from 
the YARN cluster: ");
+                                       for (String msg : messages) {
+                                               System.err.println(msg);
+                                       }
+                               }
+
+                               if (yarnCluster.getApplicationStatus() != 
ApplicationStatus.SUCCEEDED) {
+                                       System.err.println("The YARN cluster 
has failed");
+                                       yarnCluster.shutdown();
+                               }
+
+                               // wait until CLIENT_POLLING_INTERVAL is over 
or the user entered something.
+                               long startTime = System.currentTimeMillis();
+                               while ((System.currentTimeMillis() - startTime) 
< CLIENT_POLLING_INTERVALL * 1000
+                                               && (!readConsoleInput || 
!in.ready()))
+                               {
+                                       Thread.sleep(200);
+                               }
+                               //------------- handle interactive command by 
user. ----------------------
+                               
+                               if (readConsoleInput && in.ready()) {
+                                       String command = in.readLine();
+                                       switch (command) {
+                                               case "quit":
+                                               case "stop":
+                                                       break label;
+
+                                               case "help":
+                                                       
System.err.println(HELP);
+                                                       break;
+                                               default:
+                                                       
System.err.println("Unknown command '" + command + "'. Showing help: \n" + 
HELP);
+                                                       break;
+                                       }
+                               }
+                               
+                               if (yarnCluster.hasBeenShutdown()) {
+                                       LOG.info("Stopping interactive command 
line interface, YARN cluster has been stopped.");
+                                       break;
+                               }
+                       }
+               } catch(Exception e) {
+                       LOG.warn("Exception while running the interactive 
command line interface", e);
+               }
+       }
+
+       public static void main(String[] args) {
+               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // 
no prefix for the YARN session
+               System.exit(cli.run(args));
+       }
+
+       @Override
+       public String getIdentifier() {
+               return ID;
+       }
+
+       public void addOptions(Options options) {
+               options.addOption(FLINK_JAR);
+               options.addOption(JM_MEMORY);
+               options.addOption(TM_MEMORY);
+               options.addOption(CONTAINER);
+               options.addOption(QUEUE);
+               options.addOption(QUERY);
+               options.addOption(SHIP_PATH);
+               options.addOption(SLOTS);
+               options.addOption(DYNAMIC_PROPERTIES);
+               options.addOption(DETACHED);
+               options.addOption(STREAMING);
+               options.addOption(NAME);
+       }
+
+       @Override
+       public ClusterClient retrieveCluster(Configuration config) throws 
Exception {
+
+               if(resumeFromYarnProperties(config)) {
+                       return new StandaloneClusterClient(config);
+               }
+
+               return null;
+       }
+
+       public int run(String[] args) {
+               //
+               //      Command Line Options
+               //
+               Options options = new Options();
+               addOptions(options);
+
+               CommandLineParser parser = new PosixParser();
+               CommandLine cmd;
+               try {
+                       cmd = parser.parse(options, args);
+               } catch(Exception e) {
+                       System.out.println(e.getMessage());
+                       printUsage();
+                       return 1;
+               }
+               
+               // Query cluster for metrics
+               if (cmd.hasOption(QUERY.getOpt())) {
+                       YarnClusterDescriptor flinkYarnClient = new 
YarnClusterDescriptor();
+                       String description;
+                       try {
+                               description = 
flinkYarnClient.getClusterDescription();
+                       } catch (Exception e) {
+                               System.err.println("Error while querying the 
YARN cluster for available resources: "+e.getMessage());
+                               e.printStackTrace(System.err);
+                               return 1;
+                       }
+                       System.out.println(description);
+                       return 0;
+               } else {
+
+                       YarnClusterDescriptor flinkYarnClient;
+                       try {
+                               flinkYarnClient = createDescriptor(null, cmd);
+                       } catch (Exception e) {
+                               System.err.println("Error while starting the 
YARN Client. Please check log output!");
+                               return 1;
+                       }
+
+                       try {
+                               yarnCluster = flinkYarnClient.deploy();
+                       } catch (Exception e) {
+                               System.err.println("Error while deploying YARN 
cluster: "+e.getMessage());
+                               e.printStackTrace(System.err);
+                               return 1;
+                       }
+                       //------------------ ClusterClient deployed, handle 
connection details
+                       String jobManagerAddress = 
yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + 
yarnCluster.getJobManagerAddress().getPort();
+                       System.out.println("Flink JobManager is now running on 
" + jobManagerAddress);
+                       System.out.println("JobManager Web Interface: " + 
yarnCluster.getWebInterfaceURL());
+
+                       // file that we write into the conf/ dir containing the 
jobManager address and the dop.
+                       File yarnPropertiesFile = new 
File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
+
+                       Properties yarnProps = new Properties();
+                       yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, 
jobManagerAddress);
+                       if (flinkYarnClient.getTaskManagerSlots() != -1) {
+                               String parallelism =
+                                               
Integer.toString(flinkYarnClient.getTaskManagerSlots() * 
flinkYarnClient.getTaskManagerCount());
+                               
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
+                       }
+                       // add dynamic properties
+                       if (flinkYarnClient.getDynamicPropertiesEncoded() != 
null) {
+                               
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
+                                               
flinkYarnClient.getDynamicPropertiesEncoded());
+                       }
+                       writeYarnProperties(yarnProps, yarnPropertiesFile);
+
+                       //------------------ ClusterClient running, let user 
control it ------------
+
+                       if (detachedMode) {
+                               // print info and quit:
+                               LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop " +
+                                               "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
+                                               "yarn application -kill " + 
yarnCluster.getClusterIdentifier() + "\n" +
+                                               "Please also note that the 
temporary files of the YARN session in {} will not be removed.",
+                                               
flinkYarnClient.getSessionFilesDir());
+                               yarnCluster.disconnect();
+                       } else {
+                               runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
+
+                               if (!yarnCluster.hasBeenShutdown()) {
+                                       LOG.info("Command Line Interface 
requested session shutdown");
+                                       yarnCluster.shutdown();
+                               }
+
+                               try {
+                                       yarnPropertiesFile.delete();
+                               } catch (Exception e) {
+                                       LOG.warn("Exception while deleting the 
JobManager address file", e);
+                               }
+                       }
+               }
+               return 0;
+       }
+
+       /**
+        * Utility method for tests.
+        */
+       public void stop() {
+               if (yarnCluster != null) {
+                       LOG.info("Command line interface is shutting down the 
yarnCluster");
+                       yarnCluster.shutdown();
+               }
+       }
+
+       private void logAndSysout(String message) {
+               LOG.info(message);
+               System.out.println(message);
+       }
+
+       public static Map<String, String> getDynamicProperties(String 
dynamicPropertiesEncoded) {
+               if (dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
+                       Map<String, String> properties = new HashMap<>();
+
+                       String[] propertyLines = 
dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+                       for (String propLine : propertyLines) {
+                               if (propLine == null) {
+                                       continue;
+                               }
+
+                               String[] kv = propLine.split("=");
+                               if (kv.length >= 2 && kv[0] != null && kv[1] != 
null && kv[0].length() > 0) {
+                                       properties.put(kv[0], kv[1]);
+                               }
+                       }
+                       return properties;
+               }
+               else {
+                       return Collections.emptyMap();
+               }
+       }
+
+       private static String getYarnPropertiesLocation(Configuration conf) {
+               String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
+               String currentUser = System.getProperty("user.name");
+               String propertiesFileLocation = 
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
+
+               return propertiesFileLocation + File.separator + 
YARN_PROPERTIES_FILE + currentUser;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 2876309..aea1aac 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -23,12 +23,10 @@ import java.util.UUID
 import akka.actor._
 import grizzled.slf4j.Logger
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.runtime.clusterframework.ApplicationStatus
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import org.apache.flink.runtime.{LeaderSessionMessageFilter, FlinkActor, 
LogMessages}
 import org.apache.flink.yarn.YarnMessages._
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import scala.collection.mutable
 import scala.concurrent.duration._
 
@@ -36,7 +34,7 @@ import scala.language.postfixOps
 
 /** Actor which is responsible to repeatedly poll the Yarn cluster status from 
the ResourceManager.
   *
-  * This class represents the bridge between the [[FlinkYarnCluster]] and the
+  * This class represents the bridge between the [[YarnClusterClient]] and the
   * [[YarnApplicationMasterRunner]].
   *
   * @param flinkConfig Configuration object
@@ -135,9 +133,9 @@ class ApplicationClient(
       }
 
     case msg: RegisterInfoMessageListenerSuccessful =>
+      // The job manager acts as a proxy between the client and the resource 
managert
       val jm = sender()
-
-      log.info(s"Successfully registered at the ResourceManager $jm")
+      log.info(s"Successfully registered at the ResourceManager using 
JobManager $jm")
 
       yarnJobManager = Some(jm)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
index 8645581..da1917b 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnMessages.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.yarn
 
-import java.util.{List => JavaList, UUID, Date}
+import java.util.{Date, UUID, List => JavaList}
 
 import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.clusterframework.ApplicationStatus
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.{ContainerStatus, Container, 
FinalApplicationStatus}
+import org.apache.hadoop.yarn.api.records.{Container, ContainerStatus, 
FinalApplicationStatus}
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
 
@@ -31,7 +32,7 @@ object YarnMessages {
 
   case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int)
 
-  case class LocalStopYarnSession(status: FinalApplicationStatus, diagnostics: 
String)
+  case class LocalStopYarnSession(status: ApplicationStatus, diagnostics: 
String)
 
   /**
     * Entry point to start a new YarnSession.

Reply via email to