[FLINK-3667] refactor client communication classes

- ClusterDescriptor: base interface for cluster deployment descriptors
- ClusterDescriptor: YarnClusterDescriptor

- ClusterClient: base class for ClusterClients, handles lifecycle of cluster
- ClusterClient: shares configuration with the implementations
- ClusterClient: StandaloneClusterClient, YarnClusterClient
- ClusterClient: remove run methods and enable detached mode via flag

- CliFrontend: remove all Yarn specific logic
- CliFrontend: remove all cluster setup logic

- CustomCommandLine: interface for other cluster implementations
- Customcommandline: enables creation of new cluster or resuming from existing

- Yarn: move Yarn classes and functionality to the yarn module (yarn
  properties, yarn interfaces)
- Yarn: improve reliability of cluster startup
- Yarn Tests: only disable parallel execution of ITCases

This closes #1978


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9b52a31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9b52a31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9b52a31

Branch: refs/heads/master
Commit: f9b52a3114a2114e6846091acf3abb294a49615b
Parents: efc344a
Author: Maximilian Michels <[email protected]>
Authored: Fri Apr 22 19:52:54 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Jun 17 10:37:58 2016 +0200

----------------------------------------------------------------------
 .../api/avro/AvroExternalJarProgramITCase.java  |  15 +-
 .../org/apache/flink/client/CliFrontend.java    | 359 ++-----
 .../flink/client/FlinkYarnSessionCli.java       | 505 ----------
 .../org/apache/flink/client/RemoteExecutor.java |   9 +-
 .../flink/client/cli/CliFrontendParser.java     | 114 ++-
 .../flink/client/cli/CustomCommandLine.java     |  57 ++
 .../client/deployment/ClusterDescriptor.java    |  41 +
 .../org/apache/flink/client/program/Client.java | 624 ------------
 .../flink/client/program/ClusterClient.java     | 695 ++++++++++++++
 .../client/program/ContextEnvironment.java      |  12 +-
 .../program/ContextEnvironmentFactory.java      |  18 +-
 .../client/program/DetachedEnvironment.java     |   6 +-
 .../client/program/StandaloneClusterClient.java |  98 ++
 .../CliFrontendAddressConfigurationTest.java    | 125 +--
 .../client/CliFrontendPackageProgramTest.java   |   5 +-
 .../apache/flink/client/CliFrontendRunTest.java |  26 +-
 .../flink/client/CliFrontendTestUtils.java      |  32 +-
 .../TestingClusterClientWithoutActorSystem.java |  55 ++
 .../client/program/ClientConnectionTest.java    |   2 +-
 .../apache/flink/client/program/ClientTest.java |  33 +-
 .../program/ExecutionPlanCreationTest.java      |   2 +-
 .../org/apache/flink/storm/api/FlinkClient.java |  11 +-
 .../flink/api/common/JobExecutionResult.java    |   3 +
 .../flink/api/common/JobSubmissionResult.java   |  24 +-
 .../main/flink-bin/conf/log4j-cli.properties    |   2 +-
 .../src/main/flink-bin/yarn-bin/yarn-session.sh |   2 +-
 .../operations/DegreesWithExceptionITCase.java  |   2 +-
 .../ReduceOnEdgesWithExceptionITCase.java       |   2 +-
 .../ReduceOnNeighborsWithExceptionITCase.java   |   2 +-
 .../webmonitor/handlers/JarActionHandler.java   |   4 +-
 .../apache/flink/runtime/client/JobClient.java  |  17 +-
 .../clusterframework/ApplicationStatus.java     |   1 +
 .../clusterframework/FlinkResourceManager.java  |   2 +-
 .../messages/GetClusterStatusResponse.java      |   2 +-
 .../runtime/yarn/AbstractFlinkYarnClient.java   | 143 ---
 .../runtime/yarn/AbstractFlinkYarnCluster.java  | 123 ---
 .../org/apache/flink/api/scala/FlinkShell.scala |  82 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |   2 +-
 .../elasticsearch2/ElasticsearchSinkITCase.java |   2 +-
 .../environment/RemoteStreamEnvironment.java    |   9 +-
 .../environment/StreamContextEnvironment.java   |   5 +-
 .../RemoteEnvironmentITCase.java                |   2 +-
 .../flink/test/misc/AutoParallelismITCase.java  |   2 +-
 .../test/recovery/SimpleRecoveryITCase.java     |   2 +-
 flink-yarn-tests/pom.xml                        |  15 +-
 ...CliFrontendYarnAddressConfigurationTest.java | 220 +++++
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  14 +-
 .../flink/yarn/TestingFlinkYarnClient.java      |  71 --
 .../yarn/TestingYarnClusterDescriptor.java      |  71 ++
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   9 +-
 .../YARNSessionCapacitySchedulerITCase.java     |   6 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  20 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   4 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 943 +++++++++++++++++++
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  28 -
 .../apache/flink/yarn/FlinkYarnClientBase.java  | 907 ------------------
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 559 -----------
 .../flink/yarn/YarnApplicationMasterRunner.java |   7 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 577 ++++++++++++
 .../flink/yarn/YarnClusterDescriptor.java       |  28 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 606 ++++++++++++
 .../apache/flink/yarn/ApplicationClient.scala   |   8 +-
 .../org/apache/flink/yarn/YarnMessages.scala    |   7 +-
 63 files changed, 3799 insertions(+), 3580 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index ac10074..29a7e58 100644
--- 
a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ 
b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,19 +19,12 @@
 package org.apache.flink.api.avro;
 
 import java.io.File;
-import java.net.InetAddress;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.RemoteExecutor;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
 import org.junit.Assert;
@@ -64,10 +57,10 @@ public class AvroExternalJarProgramITCase {
                        
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
                        
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
testMiniCluster.getLeaderRPCPort());
 
-                       Client client = new Client(config);
+                       ClusterClient client = new 
StandaloneClusterClient(config);
 
                        client.setPrintStatusDuringExecution(false);
-                       client.runBlocking(program, 4);
+                       client.run(program, 4);
 
                }
                catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 6d972bc..cf7a8c2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -20,8 +20,6 @@ package org.apache.flink.client;
 
 import akka.actor.ActorSystem;
 
-import org.apache.commons.cli.CommandLine;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -31,18 +29,21 @@ import org.apache.flink.client.cli.CancelOptions;
 import org.apache.flink.client.cli.CliArgsException;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.InfoOptions;
 import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.cli.SavepointOptions;
 import org.apache.flink.client.cli.StopOptions;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -53,7 +54,6 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -68,8 +68,6 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSucc
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
@@ -81,10 +79,8 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.text.SimpleDateFormat;
@@ -93,10 +89,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
@@ -121,20 +115,6 @@ public class CliFrontend {
        private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
        private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 
-       // YARN-session related constants
-       public static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
-       public static final String YARN_PROPERTIES_JOBMANAGER_KEY = 
"jobManager";
-       public static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
-       public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = 
"dynamicPropertiesString";
-
-       public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // 
this has to be a regex for String.split()
-
-       /**
-        * A special host name used to run a job by deploying Flink into a YARN 
cluster,
-        * if this string is specified as the JobManager address
-        */
-       public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
-
        // 
--------------------------------------------------------------------------------------------
        // 
--------------------------------------------------------------------------------------------
 
@@ -149,12 +129,9 @@ public class CliFrontend {
 
        private ActorSystem actorSystem;
 
-       private AbstractFlinkYarnCluster yarnCluster;
-
        /**
         *
-        * @throws Exception Thrown if the configuration directory was not 
found, the configuration could not
-        *                   be loaded, or the YARN properties could not be 
parsed.
+        * @throws Exception Thrown if the configuration directory was not 
found, the configuration could not be loaded
         */
        public CliFrontend() throws Exception {
                this(getConfigurationDirectoryFromEnv());
@@ -171,61 +148,6 @@ public class CliFrontend {
                
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
                this.config = GlobalConfiguration.getConfiguration();
 
-               // load the YARN properties
-               File propertiesFile = new 
File(getYarnPropertiesLocation(config));
-               if (propertiesFile.exists()) {
-
-                       logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
-
-                       Properties yarnProperties = new Properties();
-                       try {
-                               try (InputStream is = new 
FileInputStream(propertiesFile)) {
-                                       yarnProperties.load(is);
-                               }
-                       }
-                       catch (IOException e) {
-                               throw new Exception("Cannot read the YARN 
properties file", e);
-                       }
-
-                       // configure the default parallelism from YARN
-                       String propParallelism = 
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
-                       if (propParallelism != null) { // maybe the property is 
not set
-                               try {
-                                       int parallelism = 
Integer.parseInt(propParallelism);
-                                       
this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
-
-                                       logAndSysout("YARN properties set 
default parallelism to " + parallelism);
-                               }
-                               catch (NumberFormatException e) {
-                                       throw new Exception("Error while 
parsing the YARN properties: " +
-                                                       "Property " + 
YARN_PROPERTIES_PARALLELISM + " is not an integer.");
-                               }
-                       }
-
-                       // get the JobManager address from the YARN properties
-                       String address = 
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
-                       InetSocketAddress jobManagerAddress;
-                       if (address != null) {
-                               try {
-                                       jobManagerAddress = 
ClientUtils.parseHostPortAddress(address);
-                                       // store address in config from where 
it is retrieved by the retrieval service
-                                       
writeJobManagerAddressToConfig(jobManagerAddress);
-                               }
-                               catch (Exception e) {
-                                       throw new Exception("YARN properties 
contain an invalid entry for JobManager address.", e);
-                               }
-
-                               logAndSysout("Using JobManager address from 
YARN properties " + jobManagerAddress);
-                       }
-
-                       // handle the YARN client's dynamic properties
-                       String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
-                       Map<String, String> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
-                       for (Map.Entry<String, String> dynamicProperty : 
dynamicProperties.entrySet()) {
-                               this.config.setString(dynamicProperty.getKey(), 
dynamicProperty.getValue());
-                       }
-               }
-
                try {
                        FileSystem.setDefaultScheme(config);
                } catch (IOException e) {
@@ -301,61 +223,33 @@ public class CliFrontend {
                        return handleError(t);
                }
 
-               int exitCode = 1;
+               ClusterClient client = null;
                try {
-                       int userParallelism = options.getParallelism();
-                       LOG.debug("User parallelism is set to {}", 
userParallelism);
 
-                       Client client = getClient(options, 
program.getMainClassName(), userParallelism, options.getDetachedMode());
+                       client = getClient(options, program.getMainClassName());
                        
client.setPrintStatusDuringExecution(options.getStdoutLogging());
+                       client.setDetached(options.getDetachedMode());
                        LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
 
                        LOG.debug("Savepoint path is set to {}", 
options.getSavepointPath());
 
-                       try {
-                               if (client.getMaxSlots() != -1 && 
userParallelism == -1) {
-                                       logAndSysout("Using the parallelism 
provided by the remote cluster ("+client.getMaxSlots()+"). " +
-                                                       "To use another 
parallelism, set it at the ./bin/flink client.");
-                                       userParallelism = client.getMaxSlots();
-                               }
-
-                               // detached mode
-                               if (options.getDetachedMode() || (yarnCluster 
!= null && yarnCluster.isDetached())) {
-                                       exitCode = 
executeProgramDetached(program, client, userParallelism);
-                               }
-                               else {
-                                       exitCode = 
executeProgramBlocking(program, client, userParallelism);
-                               }
-
-                               // show YARN cluster status if its not a 
detached YARN cluster.
-                               if (yarnCluster != null && 
!yarnCluster.isDetached()) {
-                                       List<String> msgs = 
yarnCluster.getNewMessages();
-                                       if (msgs != null && msgs.size() > 1) {
-
-                                               logAndSysout("The following 
messages were created by the YARN cluster while running the Job:");
-                                               for (String msg : msgs) {
-                                                       logAndSysout(msg);
-                                               }
-                                       }
-                                       if (yarnCluster.hasFailed()) {
-                                               logAndSysout("YARN cluster is 
in failed state!");
-                                               logAndSysout("YARN Diagnostics: 
" + yarnCluster.getDiagnostics());
-                                       }
-                               }
-
-                               return exitCode;
-                       }
-                       finally {
-                               client.shutdown();
+                       int userParallelism = options.getParallelism();
+                       LOG.debug("User parallelism is set to {}", 
userParallelism);
+                       if (client.getMaxSlots() != -1 && userParallelism == 
-1) {
+                               logAndSysout("Using the parallelism provided by 
the remote cluster ("
+                                       + client.getMaxSlots()+"). "
+                                       + "To use another parallelism, set it 
at the ./bin/flink client.");
+                               userParallelism = client.getMaxSlots();
                        }
+
+                       return executeProgram(program, client, userParallelism);
                }
                catch (Throwable t) {
                        return handleError(t);
                }
                finally {
-                       if (yarnCluster != null && !yarnCluster.isDetached()) {
-                               logAndSysout("Shutting down YARN cluster");
-                               yarnCluster.shutdown(exitCode != 0);
+                       if (client != null) {
+                               client.shutdown();
                        }
                        if (program != null) {
                                program.deleteExtractedLibraries();
@@ -410,7 +304,7 @@ public class CliFrontend {
                        LOG.info("Creating program plan dump");
 
                        Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
-                       FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, parallelism);
+                       FlinkPlan flinkPlan = 
ClusterClient.getOptimizedPlan(compiler, program, parallelism);
                        
                        String jsonPlan = null;
                        if (flinkPlan instanceof OptimizedPlan) {
@@ -830,53 +724,30 @@ public class CliFrontend {
        //  Interaction with programs and JobManager
        // 
--------------------------------------------------------------------------------------------
 
-       protected int executeProgramDetached(PackagedProgram program, Client 
client, int parallelism) {
-               LOG.info("Starting execution of program");
+       protected int executeProgram(PackagedProgram program, ClusterClient 
client, int parallelism) {
+               logAndSysout("Starting execution of program");
 
                JobSubmissionResult result;
                try {
-                       result = client.runDetached(program, parallelism);
+                       result = client.run(program, parallelism);
                } catch (ProgramInvocationException e) {
                        return handleError(e);
                } finally {
                        program.deleteExtractedLibraries();
                }
 
-               if (yarnCluster != null) {
-                       yarnCluster.stopAfterJob(result.getJobID());
-                       yarnCluster.disconnect();
-               }
-               
-               System.out.println("Job has been submitted with JobID " + 
result.getJobID());
-
-               return 0;
-       }
-
-       protected int executeProgramBlocking(PackagedProgram program, Client 
client, int parallelism) {
-               LOG.info("Starting execution of program");
-
-               JobSubmissionResult result;
-               try {
-                       result = client.runBlocking(program, parallelism);
-               }
-               catch (ProgramInvocationException e) {
-                       return handleError(e);
-               }
-               finally {
-                       program.deleteExtractedLibraries();
-               }
-
-               LOG.info("Program execution finished");
-
-               if (result instanceof JobExecutionResult) {
-                       JobExecutionResult execResult = (JobExecutionResult) 
result;
+               if(result.isJobExecutionResults()) {
+                       logAndSysout("Program execution finished");
+                       JobExecutionResult execResult = 
result.getJobExecutionResult();
                        System.out.println("Job with JobID " + 
execResult.getJobID() + " has finished.");
                        System.out.println("Job Runtime: " + 
execResult.getNetRuntime() + " ms");
                        Map<String, Object> accumulatorsResult = 
execResult.getAllAccumulatorResults();
                        if (accumulatorsResult.size() > 0) {
-                                       System.out.println("Accumulator 
Results: ");
-                                       
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+                               System.out.println("Accumulator Results: ");
+                               
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
                        }
+               } else {
+                       logAndSysout("Job has been submitted with JobID " + 
result.getJobID());
                }
 
                return 0;
@@ -923,16 +794,6 @@ public class CliFrontend {
        }
 
        /**
-        * Writes the given job manager address to the associated configuration 
object
-        *
-        * @param address Address to write to the configuration
-        */
-       protected void writeJobManagerAddressToConfig(InetSocketAddress 
address) {
-               config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
address.getHostName());
-               config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
address.getPort());
-       }
-
-       /**
         * Updates the associated configuration with the given command line 
options
         *
         * @param options Command line options
@@ -940,7 +801,7 @@ public class CliFrontend {
        protected void updateConfig(CommandLineOptions options) {
                if(options.getJobManagerAddress() != null){
                        InetSocketAddress jobManagerAddress = 
ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
-                       writeJobManagerAddressToConfig(jobManagerAddress);
+                       writeJobManagerAddressToConfig(config, 
jobManagerAddress);
                }
        }
 
@@ -980,110 +841,65 @@ public class CliFrontend {
        }
 
        /**
-        * Retrieves a {@link Client} object from the given command line 
options and other parameters.
+        * Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
         *
         * @param options Command line options which contain JobManager address
         * @param programName Program name
-        * @param userParallelism Given user parallelism
         * @throws Exception
         */
-       protected Client getClient(
+       protected ClusterClient getClient(
                        CommandLineOptions options,
-                       String programName,
-                       int userParallelism,
-                       boolean detachedMode)
+                       String programName)
                throws Exception {
                InetSocketAddress jobManagerAddress;
-               int maxSlots = -1;
 
-               if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-                       logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
+               // try to get the JobManager address via command-line args
+               if (options.getJobManagerAddress() != null) {
 
-                       // Default yarn application name to use, if nothing is 
specified on the command line
-                       String applicationName = "Flink Application: " + 
programName;
+                       // Get the custom command-lines (e.g. Yarn/Mesos)
+                       CustomCommandLine<?> activeCommandLine =
+                               
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
 
-                       // user wants to run Flink in YARN cluster.
-                       CommandLine commandLine = options.getCommandLine();
-                       AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
-                                                                               
                                .getFlinkYarnSessionCli()
-                                                                               
                                .withDefaultApplicationName(applicationName)
-                                                                               
                                .createFlinkYarnClient(commandLine);
+                       if (activeCommandLine != null) {
+                               logAndSysout(activeCommandLine.getIdentifier() 
+ " mode detected. Switching Log4j output to console");
 
-                       if (flinkYarnClient == null) {
-                               throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
-                       }
+                               // Default yarn application name to use, if 
nothing is specified on the command line
+                               String applicationName = "Flink Application: " 
+ programName;
 
-                       // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
-                       // from yarn options.
-                       if (detachedMode) {
-                               flinkYarnClient.setDetachedMode(true);
-                       }
+                               ClusterClient client = 
activeCommandLine.createClient(applicationName, options.getCommandLine());
 
-                       // the number of slots available from YARN:
-                       int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-                       if (yarnTmSlots == -1) {
-                               yarnTmSlots = 1;
-                       }
-                       maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
-                       if (userParallelism != -1) {
-                               int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
-                               logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
-                                               "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
-                                               "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
-                                               "will get "+slotsPerTM+" 
slots.");
-                               flinkYarnClient.setTaskManagerSlots(slotsPerTM);
-                       }
+                               logAndSysout("Cluster started");
+                               logAndSysout("JobManager web interface address 
" + client.getWebInterfaceURL());
 
-                       try {
-                               yarnCluster = flinkYarnClient.deploy();
-                               yarnCluster.connectToCluster();
-                       }
-                       catch (Exception e) {
-                               throw new RuntimeException("Error deploying the 
YARN cluster", e);
+                               return client;
+                       } else {
+                               // job manager address supplied on the 
command-line
+                               LOG.info("Using address {} to connect to 
JobManager.", options.getJobManagerAddress());
+                               jobManagerAddress = 
ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
+                               writeJobManagerAddressToConfig(config, 
jobManagerAddress);
+                               return new StandaloneClusterClient(config);
                        }
 
-                       jobManagerAddress = yarnCluster.getJobManagerAddress();
-                       writeJobManagerAddressToConfig(jobManagerAddress);
-                       
-                       // overwrite the yarn client config (because the client 
parses the dynamic properties)
-                       
this.config.addAll(flinkYarnClient.getFlinkConfiguration());
-
-                       logAndSysout("YARN cluster started");
-                       logAndSysout("JobManager web interface address " + 
yarnCluster.getWebInterfaceURL());
-                       logAndSysout("Waiting until all TaskManagers have 
connected");
-
-                       while(true) {
-                               GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
-                               if (status != null) {
-                                       if (status.numRegisteredTaskManagers() 
< flinkYarnClient.getTaskManagerCount()) {
-                                               logAndSysout("TaskManager 
status (" + status.numRegisteredTaskManagers() + "/"
-                                                       + 
flinkYarnClient.getTaskManagerCount() + ")");
-                                       } else {
-                                               logAndSysout("All TaskManagers 
are connected");
-                                               break;
-                                       }
-                               } else {
-                                       logAndSysout("No status updates from 
the YARN cluster received so far. Waiting ...");
-                               }
-
-                               try {
-                                       Thread.sleep(500);
-                               }
-                               catch (InterruptedException e) {
-                                       LOG.error("Interrupted while waiting 
for TaskManagers");
-                                       System.err.println("Thread is 
interrupted");
-                                       Thread.currentThread().interrupt();
+               // try to get the JobManager address via resuming of a cluster
+               } else {
+                       for (CustomCommandLine cli : 
CliFrontendParser.getAllCustomCommandLine().values()) {
+                               ClusterClient client = 
cli.retrieveCluster(config);
+                               if (client != null) {
+                                       LOG.info("Using address {} to connect 
to JobManager.", client.getJobManagerAddressFromConfig());
+                                       return client;
                                }
                        }
                }
-               else {
-                       if(options.getJobManagerAddress() != null) {
-                               jobManagerAddress = 
ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
-                               
writeJobManagerAddressToConfig(jobManagerAddress);
-                       }
-               }
 
-               return new Client(config, maxSlots);
+               // read JobManager address from the config
+               if 
(config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) {
+                       return new StandaloneClusterClient(config);
+               // We tried hard but couldn't find a JobManager address
+               } else {
+                       throw new IllegalConfigurationException(
+                               "The JobManager address is neither provided at 
the command-line, " +
+                                       "nor configured in flink-conf.yaml.");
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -1275,33 +1091,16 @@ public class CliFrontend {
                return location;
        }
 
-       public static Map<String, String> getDynamicProperties(String 
dynamicPropertiesEncoded) {
-               if (dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
-                       Map<String, String> properties = new HashMap<>();
-                       
-                       String[] propertyLines = 
dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-                       for (String propLine : propertyLines) {
-                               if (propLine == null) {
-                                       continue;
-                               }
-                               
-                               String[] kv = propLine.split("=");
-                               if (kv.length >= 2 && kv[0] != null && kv[1] != 
null && kv[0].length() > 0) {
-                                       properties.put(kv[0], kv[1]);
-                               }
-                       }
-                       return properties;
-               }
-               else {
-                       return Collections.emptyMap();
-               }
-       }
-
-       public static String getYarnPropertiesLocation(Configuration conf) {
-               String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
-               String currentUser = System.getProperty("user.name");
-               String propertiesFileLocation = 
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
 
-               return propertiesFileLocation + File.separator + 
CliFrontend.YARN_PROPERTIES_FILE + currentUser;
+       /**
+        * Writes the given job manager address to the associated configuration 
object
+        *
+        * @param address Address to write to the configuration
+        * @param config The config to write to
+        */
+       public static void writeJobManagerAddressToConfig(Configuration config, 
InetSocketAddress address) {
+               config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
address.getHostName());
+               config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
address.getPort());
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
deleted file mode 100644
index bb61ffb..0000000
--- 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.client;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * Class handling the command line interface to the YARN session.
- */
-public class FlinkYarnSessionCli {
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkYarnSessionCli.class);
-
-       //------------------------------------ Constants   
-------------------------
-
-       private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-       public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
-       public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
-
-       private static final int CLIENT_POLLING_INTERVALL = 3;
-
-
-       //------------------------------------ Command Line argument options 
-------------------------
-       // the prefix transformation is used by the CliFrontend static 
constructor.
-       private final Option QUERY;
-       // --- or ---
-       private final Option QUEUE;
-       private final Option SHIP_PATH;
-       private final Option FLINK_JAR;
-       private final Option JM_MEMORY;
-       private final Option TM_MEMORY;
-       private final Option CONTAINER;
-       private final Option SLOTS;
-       private final Option DETACHED;
-       private final Option STREAMING;
-       private final Option NAME;
-
-       /**
-        * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
-        *  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
-        */
-       private final Option DYNAMIC_PROPERTIES;
-
-       private final boolean acceptInteractiveInput;
-       
-       //------------------------------------ Internal fields 
-------------------------
-       private AbstractFlinkYarnCluster yarnCluster = null;
-       private boolean detachedMode = false;
-
-       /** Default yarn application name. */
-       private String defaultApplicationName = null;
-
-       public FlinkYarnSessionCli(String shortPrefix, String longPrefix, 
boolean acceptInteractiveInput) {
-               this.acceptInteractiveInput = acceptInteractiveInput;
-               
-               QUERY = new Option(shortPrefix + "q", longPrefix + "query", 
false, "Display available YARN resources (memory, cores)");
-               QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", 
true, "Specify YARN queue.");
-               SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", 
true, "Ship files in the specified directory (t for transfer)");
-               FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", 
true, "Path to Flink jar file");
-               JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + 
"jobManagerMemory", true, "Memory for JobManager Container [in MB]");
-               TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + 
"taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
-               CONTAINER = new Option(shortPrefix + "n", longPrefix + 
"container", true, "Number of YARN container to allocate (=Number of Task 
Managers)");
-               SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", 
true, "Number of slots per TaskManager");
-               DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, 
"Dynamic properties");
-               DETACHED = new Option(shortPrefix + "d", longPrefix + 
"detached", false, "Start detached");
-               STREAMING = new Option(shortPrefix + "st", longPrefix + 
"streaming", false, "Start Flink in streaming mode");
-               NAME = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
-       }
-
-       /**
-        * Creates a new Yarn Client.
-        * @param cmd the command line to parse options from
-        * @return an instance of the client or null if there was an error
-        */
-       public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
-
-               AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-               if (flinkYarnClient == null) {
-                       return null;
-               }
-
-               if (!cmd.hasOption(CONTAINER.getOpt())) { // number of 
containers is required option!
-                       LOG.error("Missing required argument " + 
CONTAINER.getOpt());
-                       printUsage();
-                       return null;
-               }
-               
flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
-
-               // Jar Path
-               Path localJarPath;
-               if (cmd.hasOption(FLINK_JAR.getOpt())) {
-                       String userPath = 
cmd.getOptionValue(FLINK_JAR.getOpt());
-                       if(!userPath.startsWith("file://")) {
-                               userPath = "file://" + userPath;
-                       }
-                       localJarPath = new Path(userPath);
-               } else {
-                       LOG.info("No path for the flink jar passed. Using the 
location of "+flinkYarnClient.getClass()+" to locate the jar");
-                       localJarPath = new 
Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
-               }
-
-               flinkYarnClient.setLocalJarPath(localJarPath);
-
-               // Conf Path
-               String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
-               GlobalConfiguration.loadConfiguration(confDirPath);
-               Configuration flinkConfiguration = 
GlobalConfiguration.getConfiguration();
-               flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
-               flinkYarnClient.setConfigurationDirectory(confDirPath);
-               File confFile = new File(confDirPath + File.separator + 
CONFIG_FILE_NAME);
-               if (!confFile.exists()) {
-                       LOG.error("Unable to locate configuration file in 
"+confFile);
-                       return null;
-               }
-               Path confPath = new Path(confFile.getAbsolutePath());
-
-               flinkYarnClient.setConfigurationFilePath(confPath);
-
-               List<File> shipFiles = new ArrayList<>();
-               // path to directory to ship
-               if (cmd.hasOption(SHIP_PATH.getOpt())) {
-                       String shipPath = 
cmd.getOptionValue(SHIP_PATH.getOpt());
-                       File shipDir = new File(shipPath);
-                       if (shipDir.isDirectory()) {
-                               shipFiles = new 
ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
-                                       @Override
-                                       public boolean accept(File dir, String 
name) {
-                                               return !(name.equals(".") || 
name.equals(".."));
-                                       }
-                               })));
-                       } else {
-                               LOG.warn("Ship directory is not a directory. 
Ignoring it.");
-                       }
-               }
-
-               //check if there is a logback or log4j file
-               if (confDirPath.length() > 0) {
-                       File logback = new File(confDirPath + 
File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
-                       if (logback.exists()) {
-                               shipFiles.add(logback);
-                               
flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
-                       }
-                       File log4j = new File(confDirPath + File.pathSeparator 
+ CONFIG_FILE_LOG4J_NAME);
-                       if (log4j.exists()) {
-                               shipFiles.add(log4j);
-                               if 
(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
-                                       // this means there is already a 
logback configuration file --> fail
-                                       LOG.warn("The configuration directory 
('" + confDirPath + "') contains both LOG4J and " +
-                                                       "Logback configuration 
files. Please delete or rename one of them.");
-                               } // else
-                               
flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
-                       }
-               }
-
-               flinkYarnClient.setShipFiles(shipFiles);
-
-               // queue
-               if (cmd.hasOption(QUEUE.getOpt())) {
-                       
flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
-               }
-
-               // JobManager Memory
-               if (cmd.hasOption(JM_MEMORY.getOpt())) {
-                       int jmMemory = 
Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
-                       flinkYarnClient.setJobManagerMemory(jmMemory);
-               }
-
-               // Task Managers memory
-               if (cmd.hasOption(TM_MEMORY.getOpt())) {
-                       int tmMemory = 
Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
-                       flinkYarnClient.setTaskManagerMemory(tmMemory);
-               }
-
-               if (cmd.hasOption(SLOTS.getOpt())) {
-                       int slots = 
Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
-                       flinkYarnClient.setTaskManagerSlots(slots);
-               }
-
-               String[] dynamicProperties = null;
-               if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
-                       dynamicProperties = 
cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
-               }
-               String dynamicPropertiesEncoded = 
StringUtils.join(dynamicProperties,
-                               CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-
-               
flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
-
-               if (cmd.hasOption(DETACHED.getOpt())) {
-                       this.detachedMode = true;
-                       flinkYarnClient.setDetachedMode(detachedMode);
-               }
-
-               if(cmd.hasOption(NAME.getOpt())) {
-                       
flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
-               } else {
-                       // set the default application name, if none is 
specified
-                       if(defaultApplicationName != null) {
-                               flinkYarnClient.setName(defaultApplicationName);
-                       }
-               }
-
-               return flinkYarnClient;
-       }
-
-
-       private void printUsage() {
-               System.out.println("Usage:");
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setWidth(200);
-               formatter.setLeftPadding(5);
-               formatter.setSyntaxPrefix("   Required");
-               Options req = new Options();
-               req.addOption(CONTAINER);
-               formatter.printHelp(" ", req);
-
-               formatter.setSyntaxPrefix("   Optional");
-               Options opt = new Options();
-               opt.addOption(JM_MEMORY);
-               opt.addOption(TM_MEMORY);
-               opt.addOption(QUERY);
-               opt.addOption(QUEUE);
-               opt.addOption(SLOTS);
-               opt.addOption(DYNAMIC_PROPERTIES);
-               opt.addOption(DETACHED);
-               opt.addOption(STREAMING);
-               opt.addOption(NAME);
-               formatter.printHelp(" ", opt);
-       }
-
-       public static AbstractFlinkYarnClient getFlinkYarnClient() {
-               AbstractFlinkYarnClient yarnClient;
-               try {
-                       Class<? extends AbstractFlinkYarnClient> 
yarnClientClass =
-                                       
Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
-                       yarnClient = 
InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
-               }
-               catch (ClassNotFoundException e) {
-                       System.err.println("Unable to locate the Flink YARN 
Client. " +
-                                       "Please ensure that you are using a 
Flink build with Hadoop2/YARN support. Message: " +
-                                       e.getMessage());
-                       e.printStackTrace(System.err);
-                       return null; // make it obvious
-               }
-               return yarnClient;
-       }
-
-       private static void writeYarnProperties(Properties properties, File 
propertiesFile) {
-               try {
-                       OutputStream out = new FileOutputStream(propertiesFile);
-                       properties.store(out, "Generated YARN properties file");
-                       out.close();
-               } catch (IOException e) {
-                       throw new RuntimeException("Error writing the 
properties file", e);
-               }
-               propertiesFile.setReadable(true, false); // readable for all.
-       }
-
-       public static void runInteractiveCli(AbstractFlinkYarnCluster 
yarnCluster, boolean readConsoleInput) {
-               final String HELP = "Available commands:\n" +
-                               "help - show these commands\n" +
-                               "stop - stop the YARN session";
-               int numTaskmanagers = 0;
-               try {
-                       BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in));
-                       label:
-                       while (true) {
-                               // ------------------ check if there are 
updates by the cluster -----------
-
-                               GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
-                               LOG.debug("Received status message: {}", 
status);
-
-                               if (status != null && numTaskmanagers != 
status.numRegisteredTaskManagers()) {
-                                       System.err.println("Number of connected 
TaskManagers changed to " +
-                                                       
status.numRegisteredTaskManagers() + ". " +
-                                               "Slots available: " + 
status.totalNumberOfSlots());
-                                       numTaskmanagers = 
status.numRegisteredTaskManagers();
-                               }
-
-                               List<String> messages = 
yarnCluster.getNewMessages();
-                               if (messages != null && messages.size() > 0) {
-                                       System.err.println("New messages from 
the YARN cluster: ");
-                                       for (String msg : messages) {
-                                               System.err.println(msg);
-                                       }
-                               }
-
-                               if (yarnCluster.hasFailed()) {
-                                       System.err.println("The YARN cluster 
has failed");
-                                       yarnCluster.shutdown(true);
-                               }
-
-                               // wait until CLIENT_POLLING_INTERVAL is over 
or the user entered something.
-                               long startTime = System.currentTimeMillis();
-                               while ((System.currentTimeMillis() - startTime) 
< CLIENT_POLLING_INTERVALL * 1000
-                                               && (!readConsoleInput || 
!in.ready()))
-                               {
-                                       Thread.sleep(200);
-                               }
-                               //------------- handle interactive command by 
user. ----------------------
-                               
-                               if (readConsoleInput && in.ready()) {
-                                       String command = in.readLine();
-                                       switch (command) {
-                                               case "quit":
-                                               case "stop":
-                                                       break label;
-
-                                               case "help":
-                                                       
System.err.println(HELP);
-                                                       break;
-                                               default:
-                                                       
System.err.println("Unknown command '" + command + "'. Showing help: \n" + 
HELP);
-                                                       break;
-                                       }
-                               }
-                               
-                               if (yarnCluster.hasBeenStopped()) {
-                                       LOG.info("Stopping interactive command 
line interface, YARN cluster has been stopped.");
-                                       break;
-                               }
-                       }
-               } catch(Exception e) {
-                       LOG.warn("Exception while running the interactive 
command line interface", e);
-               }
-       }
-
-       public static void main(String[] args) {
-               FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", 
true); // no prefix for the YARN session
-               System.exit(cli.run(args));
-       }
-
-       public void getYARNSessionCLIOptions(Options options) {
-               options.addOption(FLINK_JAR);
-               options.addOption(JM_MEMORY);
-               options.addOption(TM_MEMORY);
-               options.addOption(CONTAINER);
-               options.addOption(QUEUE);
-               options.addOption(QUERY);
-               options.addOption(SHIP_PATH);
-               options.addOption(SLOTS);
-               options.addOption(DYNAMIC_PROPERTIES);
-               options.addOption(DETACHED);
-               options.addOption(STREAMING);
-               options.addOption(NAME);
-       }
-
-       public int run(String[] args) {
-               //
-               //      Command Line Options
-               //
-               Options options = new Options();
-               getYARNSessionCLIOptions(options);
-
-               CommandLineParser parser = new PosixParser();
-               CommandLine cmd;
-               try {
-                       cmd = parser.parse(options, args);
-               } catch(Exception e) {
-                       System.out.println(e.getMessage());
-                       printUsage();
-                       return 1;
-               }
-               
-               // Query cluster for metrics
-               if (cmd.hasOption(QUERY.getOpt())) {
-                       AbstractFlinkYarnClient flinkYarnClient = 
getFlinkYarnClient();
-                       String description;
-                       try {
-                               description = 
flinkYarnClient.getClusterDescription();
-                       } catch (Exception e) {
-                               System.err.println("Error while querying the 
YARN cluster for available resources: "+e.getMessage());
-                               e.printStackTrace(System.err);
-                               return 1;
-                       }
-                       System.out.println(description);
-                       return 0;
-               } else {
-                       AbstractFlinkYarnClient flinkYarnClient = 
createFlinkYarnClient(cmd);
-
-                       if (flinkYarnClient == null) {
-                               System.err.println("Error while starting the 
YARN Client. Please check log output!");
-                               return 1;
-                       }
-
-                       try {
-                               yarnCluster = flinkYarnClient.deploy();
-                               // only connect to cluster if its not a 
detached session.
-                               if(!flinkYarnClient.isDetached()) {
-                                       yarnCluster.connectToCluster();
-                               }
-                       } catch (Exception e) {
-                               System.err.println("Error while deploying YARN 
cluster: "+e.getMessage());
-                               e.printStackTrace(System.err);
-                               return 1;
-                       }
-                       //------------------ Cluster deployed, handle 
connection details
-                       String jobManagerAddress = 
yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + 
yarnCluster.getJobManagerAddress().getPort();
-                       System.out.println("Flink JobManager is now running on 
" + jobManagerAddress);
-                       System.out.println("JobManager Web Interface: " + 
yarnCluster.getWebInterfaceURL());
-
-                       // file that we write into the conf/ dir containing the 
jobManager address and the dop.
-                       File yarnPropertiesFile = new 
File(CliFrontend.getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));
-
-                       Properties yarnProps = new Properties();
-                       
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, 
jobManagerAddress);
-                       if (flinkYarnClient.getTaskManagerSlots() != -1) {
-                               String parallelism =
-                                               
Integer.toString(flinkYarnClient.getTaskManagerSlots() * 
flinkYarnClient.getTaskManagerCount());
-                               
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_PARALLELISM, parallelism);
-                       }
-                       // add dynamic properties
-                       if (flinkYarnClient.getDynamicPropertiesEncoded() != 
null) {
-                               
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
-                                               
flinkYarnClient.getDynamicPropertiesEncoded());
-                       }
-                       writeYarnProperties(yarnProps, yarnPropertiesFile);
-
-                       //------------------ Cluster running, let user control 
it ------------
-
-                       if (detachedMode) {
-                               // print info and quit:
-                               LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop " +
-                                               "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
-                                               "yarn application -kill 
"+yarnCluster.getApplicationId()+"\n" +
-                                               "Please also note that the 
temporary files of the YARN session in {} will not be removed.",
-                                               
flinkYarnClient.getSessionFilesDir());
-                       } else {
-                               runInteractiveCli(yarnCluster, 
acceptInteractiveInput);
-
-                               if (!yarnCluster.hasBeenStopped()) {
-                                       LOG.info("Command Line Interface 
requested session shutdown");
-                                       yarnCluster.shutdown(false);
-                               }
-
-                               try {
-                                       yarnPropertiesFile.delete();
-                               } catch (Exception e) {
-                                       LOG.warn("Exception while deleting the 
JobManager address file", e);
-                               }
-                       }
-               }
-               return 0;
-       }
-
-       /**
-        * Sets the default Yarn Application Name.
-        * @param defaultApplicationName the name of the yarn application to use
-        * @return FlinkYarnSessionCli instance, for chaining
-     */
-       public FlinkYarnSessionCli withDefaultApplicationName(String 
defaultApplicationName) {
-               this.defaultApplicationName = defaultApplicationName;
-               return this;
-       }
-
-       /**
-        * Utility method for tests.
-        */
-       public void stop() {
-               if (yarnCluster != null) {
-                       LOG.info("Command line interface is shutting down the 
yarnCluster");
-                       yarnCluster.shutdown(false);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index ab70453..86b36b3 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -27,8 +27,9 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.ConfigConstants;
@@ -57,7 +58,7 @@ public class RemoteExecutor extends PlanExecutor {
 
        private final Configuration clientConfiguration;
 
-       private Client client;
+       private ClusterClient client;
 
        private int defaultParallelism = 1;
 
@@ -149,7 +150,7 @@ public class RemoteExecutor extends PlanExecutor {
        public void start() throws Exception {
                synchronized (lock) {
                        if (client == null) {
-                               client = new Client(clientConfiguration);
+                               client = new 
StandaloneClusterClient(clientConfiguration);
                                
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
                        }
                        else {
@@ -207,7 +208,7 @@ public class RemoteExecutor extends PlanExecutor {
                        }
 
                        try {
-                               return client.runBlocking(program, 
defaultParallelism);
+                               return client.run(program, 
defaultParallelism).getJobExecutionResult();
                        }
                        finally {
                                if (shutDownAtEnd) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index b75952e..f28d1b6 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -24,8 +24,16 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  * A simple command line parser (based on Apache Commons CLI) that extracts 
command
@@ -33,9 +41,17 @@ import org.apache.flink.client.FlinkYarnSessionCli;
  */
 public class CliFrontendParser {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(CliFrontendParser.class);
+
+
        /** command line interface of the YARN session, with a special 
initialization here
         *  to prefix all options with y/yarn. */
-       private static final FlinkYarnSessionCli yarnSessionCLi = new 
FlinkYarnSessionCli("y", "yarn", true);
+       private static final Map<String, CustomCommandLine> customCommandLine = 
new HashMap<>(1);
+
+       static {
+               // we could easily add more here in the future
+               
loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", 
"yarn");
+       }
 
 
        static final Option HELP_OPTION = new Option("h", "help", false,
@@ -43,7 +59,7 @@ public class CliFrontendParser {
 
        static final Option JAR_OPTION = new Option("j", "jarfile", true, 
"Flink program JAR file.");
 
-       public static final Option CLASS_OPTION = new Option("c", "class", true,
+       static final Option CLASS_OPTION = new Option("c", "class", true,
                        "Class with the program entry point (\"main\" method or 
\"getPlan()\" method. Only needed if the " +
                        "JAR file does not specify the class in its manifest.");
 
@@ -53,23 +69,23 @@ public class CliFrontendParser {
                                        "times for specifying more than one 
URL. The protocol must be supported by the " +
                                        "{@link java.net.URLClassLoader}.");
 
-       static final Option PARALLELISM_OPTION = new Option("p", "parallelism", 
true,
+       public static final Option PARALLELISM_OPTION = new Option("p", 
"parallelism", true,
                        "The parallelism with which to run the program. 
Optional flag to override the default value " +
                        "specified in the configuration.");
 
        static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", 
false, "If present, " +
                        "supress logging output to standard out.");
 
-       static final Option DETACHED_OPTION = new Option("d", "detached", 
false, "If present, runs " +
+       public static final Option DETACHED_OPTION = new Option("d", 
"detached", false, "If present, runs " +
                        "the job in detached mode");
 
        static final Option ARGS_OPTION = new Option("a", "arguments", true,
                        "Program arguments. Arguments can also be added without 
-a, simply as trailing parameters.");
 
        static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
-                       "Address of the JobManager (master) to which to 
connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER +
-                       "' as the JobManager to deploy a YARN cluster for the 
job. Use this flag to connect to a " +
-                       "different JobManager than the one specified in the 
configuration.");
+                       "Address of the JobManager (master) to which to 
connect. " +
+                       "Specify " + getCliIdentifierString() +" as the 
JobManager to deploy a cluster for the job. " +
+                       "Use this flag to connect to a different JobManager 
than the one specified in the configuration.");
 
        static final Option SAVEPOINT_PATH_OPTION = new Option("s", 
"fromSavepoint", true,
                        "Path to a savepoint to reset the job back to (for 
example file:///flink/savepoint-1537).");
@@ -143,8 +159,10 @@ public class CliFrontendParser {
                options.addOption(DETACHED_OPTION);
                options.addOption(SAVEPOINT_PATH_OPTION);
 
-               // also add the YARN options so that the parser can parse them
-               yarnSessionCLi.getYARNSessionCLIOptions(options);
+               for (CustomCommandLine customCLI : customCommandLine.values()) {
+                       customCLI.addOptions(options);
+               }
+
                return options;
        }
 
@@ -240,10 +258,16 @@ public class CliFrontendParser {
                System.out.println("\n  Syntax: run [OPTIONS] <jar-file> 
<arguments>");
                formatter.setSyntaxPrefix("  \"run\" action options:");
                formatter.printHelp(" ", 
getRunOptionsWithoutDeprecatedOptions(new Options()));
-               formatter.setSyntaxPrefix("  Additional arguments if -m " + 
CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
-               Options yarnOpts = new Options();
-               yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
-               formatter.printHelp(" ", yarnOpts);
+
+               // prints options from all available command-line classes
+               for (Map.Entry<String, CustomCommandLine> entry: 
customCommandLine.entrySet()) {
+                       formatter.setSyntaxPrefix("  Additional arguments if -m 
" + entry.getKey() + " is set:");
+                       Options customOpts = new Options();
+                       entry.getValue().addOptions(customOpts);
+                       formatter.printHelp(" ", customOpts);
+                       System.out.println();
+               }
+
                System.out.println();
        }
 
@@ -376,7 +400,63 @@ public class CliFrontendParser {
                }
        }
 
-       public static FlinkYarnSessionCli getFlinkYarnSessionCli() {
-               return yarnSessionCLi;
+       public static Map<String, CustomCommandLine> getAllCustomCommandLine() {
+               if (customCommandLine.isEmpty()) {
+                       LOG.warn("No custom command-line classes were loaded.");
+               }
+               return Collections.unmodifiableMap(customCommandLine);
+       }
+
+       private static String getCliIdentifierString() {
+               StringBuilder builder = new StringBuilder();
+               boolean first = true;
+               for (String identifier : customCommandLine.keySet()) {
+                       if (!first) {
+                               builder.append(", ");
+                       }
+                       first = false;
+                       builder.append("'").append(identifier).append("'");
+               }
+               return builder.toString();
+       }
+
+       /**
+        * Gets the custom command-line for this identifier.
+        * @param identifier The unique identifier for this command-line 
implementation.
+        * @return CustomCommandLine or null if none was found
+        */
+       public static CustomCommandLine getActiveCustomCommandLine(String 
identifier) {
+               return 
CliFrontendParser.getAllCustomCommandLine().get(identifier);
        }
+
+       private static void loadCustomCommandLine(String className, Object... 
params) {
+
+               try {
+                       Class<? extends CustomCommandLine> customCliClass =
+                               
Class.forName(className).asSubclass(CustomCommandLine.class);
+
+                       // construct class types from the parameters
+                       Class<?>[] types = new Class<?>[params.length];
+                       for (int i = 0; i < params.length; i++) {
+                               Preconditions.checkNotNull(params[i], 
"Parameters for custom command-lines may not be null.");
+                               types[i] = params[i].getClass();
+                       }
+
+                       Constructor<? extends CustomCommandLine> constructor = 
customCliClass.getConstructor(types);
+                       final CustomCommandLine cli = 
constructor.newInstance(params);
+
+                       String cliIdentifier = 
Preconditions.checkNotNull(cli.getIdentifier());
+                       CustomCommandLine existing = 
customCommandLine.put(cliIdentifier, cli);
+
+                       if (existing != null) {
+                               throw new IllegalStateException("Attempted to 
register " + cliIdentifier +
+                                       " but there is already a command-line 
with this identifier.");
+                       }
+               } catch (ClassNotFoundException | NoSuchMethodException | 
IllegalAccessException | InstantiationException
+                       | InvocationTargetException e) {
+                       LOG.warn("Unable to locate custom CLI class {}. " +
+                               "Flink is not compiled with support for this 
class.", className, e);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
new file mode 100644
index 0000000..cd5e0e6
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * Custom command-line interface to load hooks for the command-line interface.
+ */
+public interface CustomCommandLine<ClusterType extends ClusterClient> {
+
+       /**
+        * Returns a unique identifier for this custom command-line.
+        * @return An unique identifier string
+        */
+       String getIdentifier();
+
+       /**
+        * Adds custom options to the existing options.
+        * @param baseOptions The existing options.
+        */
+       void addOptions(Options baseOptions);
+
+       /**
+        * Retrieves a client for a running cluster
+        * @param config The Flink config
+        * @return Client if a cluster could be retrieve, null otherwise
+        */
+       ClusterClient retrieveCluster(Configuration config) throws Exception;
+
+       /**
+        * Creates the client for the cluster
+        * @param applicationName The application name to use
+        * @param commandLine The command-line options parsed by the CliFrontend
+        * @return The client to communicate with the cluster which the 
CustomCommandLine brought up.
+        */
+       ClusterType createClient(String applicationName, CommandLine 
commandLine) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
new file mode 100644
index 0000000..cf0595b
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+
+import org.apache.flink.client.program.ClusterClient;
+
+/**
+ * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client 
for Cluster communication.
+ */
+public interface ClusterDescriptor<ClientType extends ClusterClient> {
+
+       /**
+        * Returns a String containing details about the cluster (NodeManagers, 
available memory, ...)
+        *
+        */
+       String getClusterDescription() throws Exception;
+
+       /**
+        * Triggers deployment of a cluster
+        * @return Client for the cluster
+        * @throws Exception
+        */
+       ClientType deploy() throws Exception;
+}

Reply via email to