http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index a5b8af7..9130fdd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -55,6 +56,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -77,9 +79,6 @@ public class YarnClusterClient extends ClusterClient {
        // (HDFS) location of the files required to run on YARN. Needed here to 
delete them on shutdown.
        private final Path sessionFilesDir;
 
-       /** The leader retrieval service for connecting to the cluster and 
finding the active leader. */
-       private final LeaderRetrievalService leaderRetrievalService;
-
        //---------- Class internal fields -------------------
 
        private final AbstractYarnClusterDescriptor clusterDescriptor;
@@ -92,6 +91,7 @@ public class YarnClusterClient extends ClusterClient {
 
        private boolean isConnected = false;
 
+       private final boolean perJobCluster;
 
        /**
         * Create a new Flink on YARN cluster.
@@ -101,6 +101,7 @@ public class YarnClusterClient extends ClusterClient {
         * @param appReport the YARN application ID
         * @param flinkConfig Flink configuration
         * @param sessionFilesDir Location of files required for YARN session
+        * @param perJobCluster Indicator whether this cluster is only created 
for a single job and then shutdown
         * @throws IOException
         * @throws YarnException
         */
@@ -109,7 +110,8 @@ public class YarnClusterClient extends ClusterClient {
                final YarnClient yarnClient,
                final ApplicationReport appReport,
                org.apache.flink.configuration.Configuration flinkConfig,
-               Path sessionFilesDir) throws IOException, YarnException {
+               Path sessionFilesDir,
+               boolean perJobCluster) throws IOException, YarnException {
 
                super(flinkConfig);
 
@@ -122,18 +124,16 @@ public class YarnClusterClient extends ClusterClient {
                this.applicationId = appReport;
                this.appId = appReport.getApplicationId();
                this.trackingURL = appReport.getTrackingUrl();
+               this.perJobCluster = perJobCluster;
 
+               /* The leader retrieval service for connecting to the cluster 
and finding the active leader. */
+               LeaderRetrievalService leaderRetrievalService;
                try {
                        leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
                } catch (Exception e) {
                        throw new IOException("Could not create the leader 
retrieval service.", e);
                }
 
-
-               if (isConnected) {
-                       throw new IllegalStateException("Already connected to 
the cluster.");
-               }
-
                // start application client
                LOG.info("Start application client.");
 
@@ -182,28 +182,31 @@ public class YarnClusterClient extends ClusterClient {
 
                isConnected = true;
 
-               logAndSysout("Waiting until all TaskManagers have connected");
+               if (perJobCluster) {
 
-               while(true) {
-                       GetClusterStatusResponse status = getClusterStatus();
-                       if (status != null) {
-                               if (status.numRegisteredTaskManagers() < 
clusterDescriptor.getTaskManagerCount()) {
-                                       logAndSysout("TaskManager status (" + 
status.numRegisteredTaskManagers() + "/"
-                                               + 
clusterDescriptor.getTaskManagerCount() + ")");
+                       logAndSysout("Waiting until all TaskManagers have 
connected");
+
+                       while (true) {
+                               GetClusterStatusResponse status = 
getClusterStatus();
+                               if (status != null) {
+                                       if (status.numRegisteredTaskManagers() 
< clusterDescriptor.getTaskManagerCount()) {
+                                               logAndSysout("TaskManager 
status (" + status.numRegisteredTaskManagers() + "/"
+                                                       + 
clusterDescriptor.getTaskManagerCount() + ")");
+                                       } else {
+                                               logAndSysout("All TaskManagers 
are connected");
+                                               break;
+                                       }
                                } else {
-                                       logAndSysout("All TaskManagers are 
connected");
-                                       break;
+                                       logAndSysout("No status updates from 
the YARN cluster received so far. Waiting ...");
                                }
-                       } else {
-                               logAndSysout("No status updates from the YARN 
cluster received so far. Waiting ...");
-                       }
 
-                       try {
-                               Thread.sleep(500);
-                       } catch (InterruptedException e) {
-                               LOG.error("Interrupted while waiting for 
TaskManagers");
-                               System.err.println("Thread is interrupted");
-                               throw new IOException("Interrupted while 
waiting for TaskManagers", e);
+                               try {
+                                       Thread.sleep(500);
+                               } catch (InterruptedException e) {
+                                       LOG.error("Interrupted while waiting 
for TaskManagers");
+                                       System.err.println("Thread is 
interrupted");
+                                       throw new IOException("Interrupted 
while waiting for TaskManagers", e);
+                               }
                        }
                }
        }
@@ -214,9 +217,12 @@ public class YarnClusterClient extends ClusterClient {
                }
                LOG.info("Disconnecting YarnClusterClient from 
ApplicationMaster");
 
-               
if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) {
-                       LOG.warn("Error while removing the shutdown hook. The 
YARN session might be killed unintentionally");
+               try {
+                       
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+               } catch (IllegalStateException e) {
+                       // we are already in the shutdown hook
                }
+
                // tell the actor to shut down.
                applicationClient.tell(PoisonPill.getInstance(), 
applicationClient);
 
@@ -265,12 +271,30 @@ public class YarnClusterClient extends ClusterClient {
 
        @Override
        protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
-               if (isDetached()) {
-                       JobSubmissionResult result = 
super.runDetached(jobGraph, classLoader);
+               if (perJobCluster) {
                        stopAfterJob(jobGraph.getJobID());
-                       return result;
+               }
+
+               if (isDetached()) {
+                       return super.runDetached(jobGraph, classLoader);
                } else {
-                       return super.run(jobGraph, classLoader);
+                       try {
+                               return super.run(jobGraph, classLoader);
+                       } finally {
+                               // show cluster status
+                               List<String> msgs = getNewMessages();
+                               if (msgs != null && msgs.size() > 1) {
+
+                                       logAndSysout("The following messages 
were created by the YARN cluster while running the Job:");
+                                       for (String msg : msgs) {
+                                               logAndSysout(msg);
+                                       }
+                               }
+                               if (getApplicationStatus() != 
ApplicationStatus.SUCCEEDED) {
+                                       logAndSysout("YARN cluster is in 
non-successful state " + getApplicationStatus());
+                                       logAndSysout("YARN Diagnostics: " + 
getDiagnostics());
+                               }
+                       }
                }
        }
 
@@ -298,8 +322,9 @@ public class YarnClusterClient extends ClusterClient {
                        throw new IllegalStateException("The cluster is not 
connected to the ApplicationMaster.");
                }
                if(hasBeenShutdown()) {
-                       throw new RuntimeException("The YarnClusterClient has 
already been stopped");
+                       return null;
                }
+
                Future<Object> clusterStatusOption = ask(applicationClient, 
YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
                Object clusterStatus;
                try {
@@ -417,32 +442,20 @@ public class YarnClusterClient extends ClusterClient {
        @Override
        public void finalizeCluster() {
 
-               if (!isConnected) {
-                       throw new IllegalStateException("The cluster has been 
not been connected to the ApplicationMaster.");
-               }
-
-               if (isDetached()) {
-                       // only disconnect if we are running detached
+               if (isDetached() || !perJobCluster) {
+                       // only disconnect if we are not running a per job 
cluster
                        disconnect();
-                       return;
+               } else {
+                       shutdownCluster();
                }
+       }
 
-               // show cluster status
-
-               List<String> msgs = getNewMessages();
-               if (msgs != null && msgs.size() > 1) {
+       public void shutdownCluster() {
 
-                       logAndSysout("The following messages were created by 
the YARN cluster while running the Job:");
-                       for (String msg : msgs) {
-                               logAndSysout(msg);
-                       }
-               }
-               if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
-                       logAndSysout("YARN cluster is in non-successful state " 
+ getApplicationStatus());
-                       logAndSysout("YARN Diagnostics: " + getDiagnostics());
+               if (!isConnected) {
+                       throw new IllegalStateException("The cluster has been 
not been connected to the ApplicationMaster.");
                }
 
-
                if(hasBeenShutDown.getAndSet(true)) {
                        return;
                }
@@ -471,13 +484,30 @@ public class YarnClusterClient extends ClusterClient {
                        actorSystem.awaitTermination();
                }
 
-               LOG.info("Deleting files in " + sessionFilesDir);
                try {
-                       FileSystem shutFS = FileSystem.get(hadoopConfig);
-                       shutFS.delete(sessionFilesDir, true); // delete conf 
and jar file.
-                       shutFS.close();
-               }catch(IOException e){
-                       LOG.error("Could not delete the Flink jar and 
configuration files in HDFS..", e);
+                       File propertiesFile = 
FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig);
+                       if (propertiesFile.isFile()) {
+                               if (propertiesFile.delete()) {
+                                       LOG.info("Deleted Yarn properties file 
at {}", propertiesFile.getAbsoluteFile().toString());
+                               } else {
+                                       LOG.warn("Couldn't delete Yarn 
properties file at {}", propertiesFile.getAbsoluteFile().toString());
+                               }
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Exception while deleting the JobManager 
address file", e);
+               }
+
+               if (sessionFilesDir != null) {
+                       LOG.info("Deleting files in " + sessionFilesDir);
+                       try {
+                               FileSystem shutFS = 
FileSystem.get(hadoopConfig);
+                               shutFS.delete(sessionFilesDir, true); // delete 
conf and jar file.
+                               shutFS.close();
+                       } catch (IOException e) {
+                               LOG.error("Could not delete the Flink jar and 
configuration files in HDFS..", e);
+                       }
+               } else {
+                       LOG.warn("Session file directory not set. Not deleting 
session files");
                }
 
                try {
@@ -571,7 +601,6 @@ public class YarnClusterClient extends ClusterClient {
 
        @Override
        public boolean isDetached() {
-               // either we have set detached mode using the general '-d' flag 
or using the Yarn CLI flag 'yd'
                return super.isDetached() || clusterDescriptor.isDetachedMode();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 43e7c7b..5f745b2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -17,10 +17,12 @@
  */
 package org.apache.flink.yarn;
 
+
 /**
  * Default implementation of {@link AbstractYarnClusterDescriptor} which 
starts an {@link YarnApplicationMasterRunner}.
  */
 public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
        @Override
        protected Class<?> getApplicationMasterClass() {
                return YarnApplicationMasterRunner.class;

http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fdcc858..5eca4f1 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -28,11 +28,9 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -59,6 +57,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
 /**
  * Class handling the command line interface to the YARN session.
  */
@@ -97,8 +97,11 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        private final Option CONTAINER;
        private final Option SLOTS;
        private final Option DETACHED;
+       @Deprecated
        private final Option STREAMING;
        private final Option NAME;
+       
+       private final Options ALL_OPTIONS;
 
        /**
         * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
@@ -118,7 +121,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
        public FlinkYarnSessionCli(String shortPrefix, String longPrefix, 
boolean acceptInteractiveInput) {
                this.acceptInteractiveInput = acceptInteractiveInput;
-               
+
                QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
                APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + 
"applicationId", true, "Attach to running YARN session");
                QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
@@ -132,37 +135,24 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                DETACHED = new Option(shortPrefix + "d", longPrefix + 
"detached", false, "Start detached");
                STREAMING = new Option(shortPrefix + "st", longPrefix + 
"streaming", false, "Start Flink in streaming mode");
                NAME = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
+               
+               ALL_OPTIONS = new Options();
+               ALL_OPTIONS.addOption(FLINK_JAR);
+               ALL_OPTIONS.addOption(JM_MEMORY);
+               ALL_OPTIONS.addOption(TM_MEMORY);
+               ALL_OPTIONS.addOption(CONTAINER);
+               ALL_OPTIONS.addOption(QUEUE);
+               ALL_OPTIONS.addOption(QUERY);
+               ALL_OPTIONS.addOption(SHIP_PATH);
+               ALL_OPTIONS.addOption(SLOTS);
+               ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES);
+               ALL_OPTIONS.addOption(DETACHED);
+               ALL_OPTIONS.addOption(STREAMING);
+               ALL_OPTIONS.addOption(NAME);
+               ALL_OPTIONS.addOption(APPLICATION_ID);
        }
 
-       /**
-        * Attaches a new Yarn Client to running YARN application.
-        *
-        */
-       public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) {
-               AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-               if (flinkYarnClient == null) {
-                       return null;
-               }
 
-               if (!cmd.hasOption(APPLICATION_ID.getOpt())) {
-                       LOG.error("Missing required argument " + 
APPLICATION_ID.getOpt());
-                       printUsage();
-                       return null;
-               }
-
-               String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
-               GlobalConfiguration.loadConfiguration(confDirPath);
-               Configuration flinkConfiguration = 
GlobalConfiguration.getConfiguration();
-               flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
-               flinkYarnClient.setConfigurationDirectory(confDirPath);
-
-               try {
-                       return 
flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt()));
-               } catch (Exception e) {
-                       LOG.error("Could not attach to YARN session", e);
-                       return null;
-               }
-       }
        /**
         * Resumes from a Flink Yarn properties file
         * @param flinkConfiguration The flink configuration
@@ -170,7 +160,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
         */
        private boolean resumeFromYarnProperties(Configuration 
flinkConfiguration) {
                // load the YARN properties
-               File propertiesFile = new 
File(getYarnPropertiesLocation(flinkConfiguration));
+               File propertiesFile = 
getYarnPropertiesLocation(flinkConfiguration);
                if (!propertiesFile.exists()) {
                        return false;
                }
@@ -209,7 +199,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        try {
                                jobManagerAddress = 
ClientUtils.parseHostPortAddress(address);
                                // store address in config from where it is 
retrieved by the retrieval service
-                               
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, 
jobManagerAddress);
+                               
CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress);
                        }
                        catch (Exception e) {
                                throw new RuntimeException("YARN properties 
contain an invalid entry for JobManager address.", e);
@@ -228,10 +218,9 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                return true;
        }
 
-       public YarnClusterDescriptor createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
-
+       public AbstractYarnClusterDescriptor createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
 
-               YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor();
+               AbstractYarnClusterDescriptor yarnClusterDescriptor = 
getClusterDescriptor();
 
                if (!cmd.hasOption(CONTAINER.getOpt())) { // number of 
containers is required option!
                        LOG.error("Missing required argument {}", 
CONTAINER.getOpt());
@@ -343,19 +332,6 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                return yarnClusterDescriptor;
        }
 
-       @Override
-       public YarnClusterClient createClient(String applicationName, 
CommandLine cmdLine) throws Exception {
-
-               YarnClusterDescriptor yarnClusterDescriptor = 
createDescriptor(applicationName, cmdLine);
-
-               try {
-                       return yarnClusterDescriptor.deploy();
-               } catch (Exception e) {
-                       throw new RuntimeException("Error deploying the YARN 
cluster", e);
-               }
-
-       }
-
        private void printUsage() {
                System.out.println("Usage:");
                HelpFormatter formatter = new HelpFormatter();
@@ -367,17 +343,10 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                formatter.printHelp(" ", req);
 
                formatter.setSyntaxPrefix("   Optional");
-               Options opt = new Options();
-               opt.addOption(JM_MEMORY);
-               opt.addOption(TM_MEMORY);
-               opt.addOption(QUERY);
-               opt.addOption(QUEUE);
-               opt.addOption(SLOTS);
-               opt.addOption(DYNAMIC_PROPERTIES);
-               opt.addOption(DETACHED);
-               opt.addOption(STREAMING);
-               opt.addOption(NAME);
-               formatter.printHelp(" ", opt);
+               Options options = new Options();
+               addGeneralOptions(options);
+               addRunOptions(options);
+               formatter.printHelp(" ", options);
        }
 
        private static void writeYarnProperties(Properties properties, File 
propertiesFile) {
@@ -439,6 +408,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                                        switch (command) {
                                                case "quit":
                                                case "stop":
+                                                       
yarnCluster.shutdownCluster();
                                                        break label;
 
                                                case "help":
@@ -466,38 +436,62 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        }
 
        @Override
-       public String getIdentifier() {
+       public boolean isActive(CommandLine commandLine, Configuration 
configuration) {
+               String jobManagerOption = 
commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
+               boolean yarnJobManager = ID.equals(jobManagerOption);
+               return yarnJobManager || 
resumeFromYarnProperties(configuration);
+       }
+
+       @Override
+       public String getId() {
                return ID;
        }
 
-       public void addOptions(Options options) {
-               options.addOption(FLINK_JAR);
-               options.addOption(JM_MEMORY);
-               options.addOption(TM_MEMORY);
-               options.addOption(CONTAINER);
-               options.addOption(QUEUE);
-               options.addOption(QUERY);
-               options.addOption(SHIP_PATH);
-               options.addOption(SLOTS);
-               options.addOption(DYNAMIC_PROPERTIES);
-               options.addOption(DETACHED);
-               options.addOption(STREAMING);
-               options.addOption(NAME);
+       @Override
+       public void addRunOptions(Options baseOptions) {
+               for (Object option : ALL_OPTIONS.getOptions()) {
+                       baseOptions.addOption((Option) option);
+               }
        }
 
+       @Override
+       public void addGeneralOptions(Options baseOptions) {
+               baseOptions.addOption(APPLICATION_ID);
+       }
 
-       public void getYARNAttachCLIOptions(Options options) {
-               options.addOption(APPLICATION_ID);
+       @Override
+       public YarnClusterClient retrieveCluster(
+                       CommandLine cmdLine,
+                       Configuration config) throws 
UnsupportedOperationException {
+
+               // first check for an application id
+               if (cmdLine.hasOption(APPLICATION_ID.getOpt())) {
+                       String applicationID = 
cmdLine.getOptionValue(APPLICATION_ID.getOpt());
+                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
+                       yarnDescriptor.setFlinkConfiguration(config);
+                       return yarnDescriptor.retrieve(applicationID);
+               // then try to load from yarn properties
+               } else if (resumeFromYarnProperties(config)) {
+                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
+                       yarnDescriptor.setFlinkConfiguration(config);
+                       return yarnDescriptor.retrieveFromConfig(config);
+               }
+
+               throw new UnsupportedOperationException("Could not resume a 
Yarn cluster.");
        }
 
        @Override
-       public ClusterClient retrieveCluster(Configuration config) throws 
Exception {
+       public YarnClusterClient createCluster(String applicationName, 
CommandLine cmdLine, Configuration config) {
+
+               AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createDescriptor(applicationName, cmdLine);
+               yarnClusterDescriptor.setFlinkConfiguration(config);
 
-               if(resumeFromYarnProperties(config)) {
-                       return new StandaloneClusterClient(config);
+               try {
+                       return yarnClusterDescriptor.deploy();
+               } catch (Exception e) {
+                       throw new RuntimeException("Error deploying the YARN 
cluster", e);
                }
 
-               return null;
        }
 
        public int run(String[] args) {
@@ -505,7 +499,8 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                //      Command Line Options
                //
                Options options = new Options();
-               addOptions(options);
+               addGeneralOptions(options);
+               addRunOptions(options);
 
                CommandLineParser parser = new PosixParser();
                CommandLine cmd;
@@ -519,10 +514,10 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
                // Query cluster for metrics
                if (cmd.hasOption(QUERY.getOpt())) {
-                       YarnClusterDescriptor flinkYarnClient = new 
YarnClusterDescriptor();
+                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
                        String description;
                        try {
-                               description = 
flinkYarnClient.getClusterDescription();
+                               description = 
yarnDescriptor.getClusterDescription();
                        } catch (Exception e) {
                                System.err.println("Error while querying the 
YARN cluster for available resources: "+e.getMessage());
                                e.printStackTrace(System.err);
@@ -531,56 +526,61 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        System.out.println(description);
                        return 0;
                } else if (cmd.hasOption(APPLICATION_ID.getOpt())) {
-                       yarnCluster = attachFlinkYarnClient(cmd);
+
+                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
+                       try {
+                               yarnCluster = 
yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt()));
+                       } catch (Exception e) {
+                               throw new RuntimeException("Could not retrieve 
existing Yarn application", e);
+                       }
 
                        if (detachedMode) {
                                LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop " +
                                        "Flink on YARN, use the following 
command or a YARN web interface to stop it:\n" +
-                                       "yarn application -kill 
"+yarnCluster.getApplicationId());
+                                       "yarn application -kill 
"+yarnCluster.getClusterIdentifier());
+                               yarnCluster.disconnect();
                        } else {
-                               runInteractiveCli(yarnCluster);
-
-                               if (!yarnCluster.hasBeenStopped()) {
-                                       LOG.info("Command Line Interface 
requested session shutdown");
-                                       yarnCluster.shutdown(false);
-                               }
+                               runInteractiveCli(yarnCluster, true);
                        }
                } else {
 
-                       YarnClusterDescriptor flinkYarnClient;
+                       AbstractYarnClusterDescriptor yarnDescriptor;
                        try {
-                               flinkYarnClient = createDescriptor(null, cmd);
+                               yarnDescriptor = createDescriptor(null, cmd);
                        } catch (Exception e) {
                                System.err.println("Error while starting the 
YARN Client. Please check log output!");
                                return 1;
                        }
 
                        try {
-                               yarnCluster = flinkYarnClient.deploy();
+                               yarnCluster = yarnDescriptor.deploy();
                        } catch (Exception e) {
                                System.err.println("Error while deploying YARN 
cluster: "+e.getMessage());
                                e.printStackTrace(System.err);
                                return 1;
                        }
                        //------------------ ClusterClient deployed, handle 
connection details
-                       String jobManagerAddress = 
yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + 
yarnCluster.getJobManagerAddress().getPort();
+                       String jobManagerAddress =
+                               
yarnCluster.getJobManagerAddress().getAddress().getHostAddress() +
+                                       ":" + 
yarnCluster.getJobManagerAddress().getPort();
+
                        System.out.println("Flink JobManager is now running on 
" + jobManagerAddress);
                        System.out.println("JobManager Web Interface: " + 
yarnCluster.getWebInterfaceURL());
 
                        // file that we write into the conf/ dir containing the 
jobManager address and the dop.
-                       File yarnPropertiesFile = new 
File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
+                       File yarnPropertiesFile = 
getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration());
 
                        Properties yarnProps = new Properties();
                        yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, 
jobManagerAddress);
-                       if (flinkYarnClient.getTaskManagerSlots() != -1) {
+                       if (yarnDescriptor.getTaskManagerSlots() != -1) {
                                String parallelism =
-                                               
Integer.toString(flinkYarnClient.getTaskManagerSlots() * 
flinkYarnClient.getTaskManagerCount());
+                                               
Integer.toString(yarnDescriptor.getTaskManagerSlots() * 
yarnDescriptor.getTaskManagerCount());
                                
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
                        }
                        // add dynamic properties
-                       if (flinkYarnClient.getDynamicPropertiesEncoded() != 
null) {
+                       if (yarnDescriptor.getDynamicPropertiesEncoded() != 
null) {
                                
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-                                               
flinkYarnClient.getDynamicPropertiesEncoded());
+                                               
yarnDescriptor.getDynamicPropertiesEncoded());
                        }
                        writeYarnProperties(yarnProps, yarnPropertiesFile);
 
@@ -592,21 +592,10 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                                                "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
                                                "yarn application -kill " + 
yarnCluster.getClusterIdentifier() + "\n" +
                                                "Please also note that the 
temporary files of the YARN session in {} will not be removed.",
-                                               
flinkYarnClient.getSessionFilesDir());
+                                               
yarnDescriptor.getSessionFilesDir());
                                yarnCluster.disconnect();
                        } else {
                                runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
-
-                               if (!yarnCluster.hasBeenShutdown()) {
-                                       LOG.info("Command Line Interface 
requested session shutdown");
-                                       yarnCluster.shutdown();
-                               }
-
-                               try {
-                                       yarnPropertiesFile.delete();
-                               } catch (Exception e) {
-                                       LOG.warn("Exception while deleting the 
JobManager address file", e);
-                               }
                        }
                }
                return 0;
@@ -649,11 +638,16 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                }
        }
 
-       private static String getYarnPropertiesLocation(Configuration conf) {
+       public static File getYarnPropertiesLocation(Configuration conf) {
                String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
                String currentUser = System.getProperty("user.name");
-               String propertiesFileLocation = 
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
+               String propertiesFileLocation =
+                       
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
+
+               return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
+       }
 
-               return propertiesFileLocation + File.separator + 
YARN_PROPERTIES_FILE + currentUser;
+       protected AbstractYarnClusterDescriptor getClusterDescriptor() {
+               return new YarnClusterDescriptor();
        }
 }

Reply via email to