[FLINK-3667] refactor client communication classes - ClusterDescriptor: base interface for cluster deployment descriptors - ClusterDescriptor: YarnClusterDescriptor
- ClusterClient: base class for ClusterClients, handles lifecycle of cluster - ClusterClient: shares configuration with the implementations - ClusterClient: StandaloneClusterClient, YarnClusterClient - ClusterClient: remove run methods and enable detached mode via flag - CliFrontend: remove all Yarn specific logic - CliFrontend: remove all cluster setup logic - CustomCommandLine: interface for other cluster implementations - Customcommandline: enables creation of new cluster or resuming from existing - Yarn: move Yarn classes and functionality to the yarn module (yarn properties, yarn interfaces) - Yarn: improve reliability of cluster startup - Yarn Tests: only disable parallel execution of ITCases This closes #1978 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9b52a31 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9b52a31 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9b52a31 Branch: refs/heads/master Commit: f9b52a3114a2114e6846091acf3abb294a49615b Parents: efc344a Author: Maximilian Michels <[email protected]> Authored: Fri Apr 22 19:52:54 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Fri Jun 17 10:37:58 2016 +0200 ---------------------------------------------------------------------- .../api/avro/AvroExternalJarProgramITCase.java | 15 +- .../org/apache/flink/client/CliFrontend.java | 359 ++----- .../flink/client/FlinkYarnSessionCli.java | 505 ---------- .../org/apache/flink/client/RemoteExecutor.java | 9 +- .../flink/client/cli/CliFrontendParser.java | 114 ++- .../flink/client/cli/CustomCommandLine.java | 57 ++ .../client/deployment/ClusterDescriptor.java | 41 + .../org/apache/flink/client/program/Client.java | 624 ------------ .../flink/client/program/ClusterClient.java | 695 ++++++++++++++ .../client/program/ContextEnvironment.java | 12 +- .../program/ContextEnvironmentFactory.java | 18 +- .../client/program/DetachedEnvironment.java | 6 +- .../client/program/StandaloneClusterClient.java | 98 ++ .../CliFrontendAddressConfigurationTest.java | 125 +-- .../client/CliFrontendPackageProgramTest.java | 5 +- .../apache/flink/client/CliFrontendRunTest.java | 26 +- .../flink/client/CliFrontendTestUtils.java | 32 +- .../TestingClusterClientWithoutActorSystem.java | 55 ++ .../client/program/ClientConnectionTest.java | 2 +- .../apache/flink/client/program/ClientTest.java | 33 +- .../program/ExecutionPlanCreationTest.java | 2 +- .../org/apache/flink/storm/api/FlinkClient.java | 11 +- .../flink/api/common/JobExecutionResult.java | 3 + .../flink/api/common/JobSubmissionResult.java | 24 +- .../main/flink-bin/conf/log4j-cli.properties | 2 +- .../src/main/flink-bin/yarn-bin/yarn-session.sh | 2 +- .../operations/DegreesWithExceptionITCase.java | 2 +- .../ReduceOnEdgesWithExceptionITCase.java | 2 +- .../ReduceOnNeighborsWithExceptionITCase.java | 2 +- .../webmonitor/handlers/JarActionHandler.java | 4 +- .../apache/flink/runtime/client/JobClient.java | 17 +- .../clusterframework/ApplicationStatus.java | 1 + .../clusterframework/FlinkResourceManager.java | 2 +- .../messages/GetClusterStatusResponse.java | 2 +- .../runtime/yarn/AbstractFlinkYarnClient.java | 143 --- .../runtime/yarn/AbstractFlinkYarnCluster.java | 123 --- .../org/apache/flink/api/scala/FlinkShell.scala | 82 +- .../flink/api/scala/ExecutionEnvironment.scala | 2 +- .../elasticsearch2/ElasticsearchSinkITCase.java | 2 +- .../environment/RemoteStreamEnvironment.java | 9 +- .../environment/StreamContextEnvironment.java | 5 +- .../RemoteEnvironmentITCase.java | 2 +- .../flink/test/misc/AutoParallelismITCase.java | 2 +- .../test/recovery/SimpleRecoveryITCase.java | 2 +- flink-yarn-tests/pom.xml | 15 +- ...CliFrontendYarnAddressConfigurationTest.java | 220 +++++ .../flink/yarn/FlinkYarnSessionCliTest.java | 14 +- .../flink/yarn/TestingFlinkYarnClient.java | 71 -- .../yarn/TestingYarnClusterDescriptor.java | 71 ++ .../flink/yarn/YARNHighAvailabilityITCase.java | 9 +- .../YARNSessionCapacitySchedulerITCase.java | 6 +- .../flink/yarn/YARNSessionFIFOITCase.java | 20 +- .../org/apache/flink/yarn/YarnTestBase.java | 4 +- .../yarn/AbstractYarnClusterDescriptor.java | 943 +++++++++++++++++++ .../org/apache/flink/yarn/FlinkYarnClient.java | 28 - .../apache/flink/yarn/FlinkYarnClientBase.java | 907 ------------------ .../org/apache/flink/yarn/FlinkYarnCluster.java | 559 ----------- .../flink/yarn/YarnApplicationMasterRunner.java | 7 +- .../apache/flink/yarn/YarnClusterClient.java | 577 ++++++++++++ .../flink/yarn/YarnClusterDescriptor.java | 28 + .../flink/yarn/cli/FlinkYarnSessionCli.java | 606 ++++++++++++ .../apache/flink/yarn/ApplicationClient.scala | 8 +- .../org/apache/flink/yarn/YarnMessages.scala | 7 +- 63 files changed, 3799 insertions(+), 3580 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java index ac10074..29a7e58 100644 --- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java +++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java @@ -19,19 +19,12 @@ package org.apache.flink.api.avro; import java.io.File; -import java.net.InetAddress; -import org.apache.flink.api.common.Plan; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.RemoteExecutor; -import org.apache.flink.client.program.Client; -import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.plan.FlinkPlan; -import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.junit.Assert; @@ -64,10 +57,10 @@ public class AvroExternalJarProgramITCase { config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"); config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort()); - Client client = new Client(config); + ClusterClient client = new StandaloneClusterClient(config); client.setPrintStatusDuringExecution(false); - client.runBlocking(program, 4); + client.run(program, 4); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 6d972bc..cf7a8c2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -20,8 +20,6 @@ package org.apache.flink.client; import akka.actor.ActorSystem; -import org.apache.commons.cli.CommandLine; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; @@ -31,18 +29,21 @@ import org.apache.flink.client.cli.CancelOptions; import org.apache.flink.client.cli.CliArgsException; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.InfoOptions; import org.apache.flink.client.cli.ListOptions; import org.apache.flink.client.cli.ProgramOptions; import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.cli.SavepointOptions; import org.apache.flink.client.cli.StopOptions; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -53,7 +54,6 @@ import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -68,8 +68,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSucc import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; @@ -81,10 +79,8 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.net.InetSocketAddress; import java.net.URL; import java.text.SimpleDateFormat; @@ -93,10 +89,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; @@ -121,20 +115,6 @@ public class CliFrontend { private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; - // YARN-session related constants - public static final String YARN_PROPERTIES_FILE = ".yarn-properties-"; - public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager"; - public static final String YARN_PROPERTIES_PARALLELISM = "parallelism"; - public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString"; - - public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split() - - /** - * A special host name used to run a job by deploying Flink into a YARN cluster, - * if this string is specified as the JobManager address - */ - public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster"; - // -------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------- @@ -149,12 +129,9 @@ public class CliFrontend { private ActorSystem actorSystem; - private AbstractFlinkYarnCluster yarnCluster; - /** * - * @throws Exception Thrown if the configuration directory was not found, the configuration could not - * be loaded, or the YARN properties could not be parsed. + * @throws Exception Thrown if the configuration directory was not found, the configuration could not be loaded */ public CliFrontend() throws Exception { this(getConfigurationDirectoryFromEnv()); @@ -171,61 +148,6 @@ public class CliFrontend { GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); this.config = GlobalConfiguration.getConfiguration(); - // load the YARN properties - File propertiesFile = new File(getYarnPropertiesLocation(config)); - if (propertiesFile.exists()) { - - logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); - - Properties yarnProperties = new Properties(); - try { - try (InputStream is = new FileInputStream(propertiesFile)) { - yarnProperties.load(is); - } - } - catch (IOException e) { - throw new Exception("Cannot read the YARN properties file", e); - } - - // configure the default parallelism from YARN - String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM); - if (propParallelism != null) { // maybe the property is not set - try { - int parallelism = Integer.parseInt(propParallelism); - this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism); - - logAndSysout("YARN properties set default parallelism to " + parallelism); - } - catch (NumberFormatException e) { - throw new Exception("Error while parsing the YARN properties: " + - "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer."); - } - } - - // get the JobManager address from the YARN properties - String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); - InetSocketAddress jobManagerAddress; - if (address != null) { - try { - jobManagerAddress = ClientUtils.parseHostPortAddress(address); - // store address in config from where it is retrieved by the retrieval service - writeJobManagerAddressToConfig(jobManagerAddress); - } - catch (Exception e) { - throw new Exception("YARN properties contain an invalid entry for JobManager address.", e); - } - - logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); - } - - // handle the YARN client's dynamic properties - String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); - Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); - for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) { - this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); - } - } - try { FileSystem.setDefaultScheme(config); } catch (IOException e) { @@ -301,61 +223,33 @@ public class CliFrontend { return handleError(t); } - int exitCode = 1; + ClusterClient client = null; try { - int userParallelism = options.getParallelism(); - LOG.debug("User parallelism is set to {}", userParallelism); - Client client = getClient(options, program.getMainClassName(), userParallelism, options.getDetachedMode()); + client = getClient(options, program.getMainClassName()); client.setPrintStatusDuringExecution(options.getStdoutLogging()); + client.setDetached(options.getDetachedMode()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); LOG.debug("Savepoint path is set to {}", options.getSavepointPath()); - try { - if (client.getMaxSlots() != -1 && userParallelism == -1) { - logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " + - "To use another parallelism, set it at the ./bin/flink client."); - userParallelism = client.getMaxSlots(); - } - - // detached mode - if (options.getDetachedMode() || (yarnCluster != null && yarnCluster.isDetached())) { - exitCode = executeProgramDetached(program, client, userParallelism); - } - else { - exitCode = executeProgramBlocking(program, client, userParallelism); - } - - // show YARN cluster status if its not a detached YARN cluster. - if (yarnCluster != null && !yarnCluster.isDetached()) { - List<String> msgs = yarnCluster.getNewMessages(); - if (msgs != null && msgs.size() > 1) { - - logAndSysout("The following messages were created by the YARN cluster while running the Job:"); - for (String msg : msgs) { - logAndSysout(msg); - } - } - if (yarnCluster.hasFailed()) { - logAndSysout("YARN cluster is in failed state!"); - logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics()); - } - } - - return exitCode; - } - finally { - client.shutdown(); + int userParallelism = options.getParallelism(); + LOG.debug("User parallelism is set to {}", userParallelism); + if (client.getMaxSlots() != -1 && userParallelism == -1) { + logAndSysout("Using the parallelism provided by the remote cluster (" + + client.getMaxSlots()+"). " + + "To use another parallelism, set it at the ./bin/flink client."); + userParallelism = client.getMaxSlots(); } + + return executeProgram(program, client, userParallelism); } catch (Throwable t) { return handleError(t); } finally { - if (yarnCluster != null && !yarnCluster.isDetached()) { - logAndSysout("Shutting down YARN cluster"); - yarnCluster.shutdown(exitCode != 0); + if (client != null) { + client.shutdown(); } if (program != null) { program.deleteExtractedLibraries(); @@ -410,7 +304,7 @@ public class CliFrontend { LOG.info("Creating program plan dump"); Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism); + FlinkPlan flinkPlan = ClusterClient.getOptimizedPlan(compiler, program, parallelism); String jsonPlan = null; if (flinkPlan instanceof OptimizedPlan) { @@ -830,53 +724,30 @@ public class CliFrontend { // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- - protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) { - LOG.info("Starting execution of program"); + protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) { + logAndSysout("Starting execution of program"); JobSubmissionResult result; try { - result = client.runDetached(program, parallelism); + result = client.run(program, parallelism); } catch (ProgramInvocationException e) { return handleError(e); } finally { program.deleteExtractedLibraries(); } - if (yarnCluster != null) { - yarnCluster.stopAfterJob(result.getJobID()); - yarnCluster.disconnect(); - } - - System.out.println("Job has been submitted with JobID " + result.getJobID()); - - return 0; - } - - protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) { - LOG.info("Starting execution of program"); - - JobSubmissionResult result; - try { - result = client.runBlocking(program, parallelism); - } - catch (ProgramInvocationException e) { - return handleError(e); - } - finally { - program.deleteExtractedLibraries(); - } - - LOG.info("Program execution finished"); - - if (result instanceof JobExecutionResult) { - JobExecutionResult execResult = (JobExecutionResult) result; + if(result.isJobExecutionResults()) { + logAndSysout("Program execution finished"); + JobExecutionResult execResult = result.getJobExecutionResult(); System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); if (accumulatorsResult.size() > 0) { - System.out.println("Accumulator Results: "); - System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); + System.out.println("Accumulator Results: "); + System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); } + } else { + logAndSysout("Job has been submitted with JobID " + result.getJobID()); } return 0; @@ -923,16 +794,6 @@ public class CliFrontend { } /** - * Writes the given job manager address to the associated configuration object - * - * @param address Address to write to the configuration - */ - protected void writeJobManagerAddressToConfig(InetSocketAddress address) { - config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName()); - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort()); - } - - /** * Updates the associated configuration with the given command line options * * @param options Command line options @@ -940,7 +801,7 @@ public class CliFrontend { protected void updateConfig(CommandLineOptions options) { if(options.getJobManagerAddress() != null){ InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); - writeJobManagerAddressToConfig(jobManagerAddress); + writeJobManagerAddressToConfig(config, jobManagerAddress); } } @@ -980,110 +841,65 @@ public class CliFrontend { } /** - * Retrieves a {@link Client} object from the given command line options and other parameters. + * Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name - * @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) + String programName) throws Exception { InetSocketAddress jobManagerAddress; - int maxSlots = -1; - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); + // try to get the JobManager address via command-line args + if (options.getJobManagerAddress() != null) { - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; + // Get the custom command-lines (e.g. Yarn/Mesos) + CustomCommandLine<?> activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); + if (activeCommandLine != null) { + logAndSysout(activeCommandLine.getIdentifier() + " mode detected. Switching Log4j output to console"); - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } + // Default yarn application name to use, if nothing is specified on the command line + String applicationName = "Flink Application: " + programName; - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } + ClusterClient client = activeCommandLine.createClient(applicationName, options.getCommandLine()); - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } + logAndSysout("Cluster started"); + logAndSysout("JobManager web interface address " + client.getWebInterfaceURL()); - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); + return client; + } else { + // job manager address supplied on the command-line + LOG.info("Using address {} to connect to JobManager.", options.getJobManagerAddress()); + jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); + writeJobManagerAddressToConfig(config, jobManagerAddress); + return new StandaloneClusterClient(config); } - jobManagerAddress = yarnCluster.getJobManagerAddress(); - writeJobManagerAddressToConfig(jobManagerAddress); - - // overwrite the yarn client config (because the client parses the dynamic properties) - this.config.addAll(flinkYarnClient.getFlinkConfiguration()); - - logAndSysout("YARN cluster started"); - logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL()); - logAndSysout("Waiting until all TaskManagers have connected"); - - while(true) { - GetClusterStatusResponse status = yarnCluster.getClusterStatus(); - if (status != null) { - if (status.numRegisteredTaskManagers() < flinkYarnClient.getTaskManagerCount()) { - logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" - + flinkYarnClient.getTaskManagerCount() + ")"); - } else { - logAndSysout("All TaskManagers are connected"); - break; - } - } else { - logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); - } - - try { - Thread.sleep(500); - } - catch (InterruptedException e) { - LOG.error("Interrupted while waiting for TaskManagers"); - System.err.println("Thread is interrupted"); - Thread.currentThread().interrupt(); + // try to get the JobManager address via resuming of a cluster + } else { + for (CustomCommandLine cli : CliFrontendParser.getAllCustomCommandLine().values()) { + ClusterClient client = cli.retrieveCluster(config); + if (client != null) { + LOG.info("Using address {} to connect to JobManager.", client.getJobManagerAddressFromConfig()); + return client; } } } - else { - if(options.getJobManagerAddress() != null) { - jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); - writeJobManagerAddressToConfig(jobManagerAddress); - } - } - return new Client(config, maxSlots); + // read JobManager address from the config + if (config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) != null) { + return new StandaloneClusterClient(config); + // We tried hard but couldn't find a JobManager address + } else { + throw new IllegalConfigurationException( + "The JobManager address is neither provided at the command-line, " + + "nor configured in flink-conf.yaml."); + } } // -------------------------------------------------------------------------------------------- @@ -1275,33 +1091,16 @@ public class CliFrontend { return location; } - public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) { - if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { - Map<String, String> properties = new HashMap<>(); - - String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); - for (String propLine : propertyLines) { - if (propLine == null) { - continue; - } - - String[] kv = propLine.split("="); - if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) { - properties.put(kv[0], kv[1]); - } - } - return properties; - } - else { - return Collections.emptyMap(); - } - } - - public static String getYarnPropertiesLocation(Configuration conf) { - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); - String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); - return propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser; + /** + * Writes the given job manager address to the associated configuration object + * + * @param address Address to write to the configuration + * @param config The config to write to + */ + public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) { + config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName()); + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort()); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java deleted file mode 100644 index bb61ffb..0000000 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.client; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -/** - * Class handling the command line interface to the YARN session. - */ -public class FlinkYarnSessionCli { - private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class); - - //------------------------------------ Constants ------------------------- - - private static final String CONFIG_FILE_NAME = "flink-conf.yaml"; - public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml"; - public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties"; - - private static final int CLIENT_POLLING_INTERVALL = 3; - - - //------------------------------------ Command Line argument options ------------------------- - // the prefix transformation is used by the CliFrontend static constructor. - private final Option QUERY; - // --- or --- - private final Option QUEUE; - private final Option SHIP_PATH; - private final Option FLINK_JAR; - private final Option JM_MEMORY; - private final Option TM_MEMORY; - private final Option CONTAINER; - private final Option SLOTS; - private final Option DETACHED; - private final Option STREAMING; - private final Option NAME; - - /** - * Dynamic properties allow the user to specify additional configuration values with -D, such as - * -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368 - */ - private final Option DYNAMIC_PROPERTIES; - - private final boolean acceptInteractiveInput; - - //------------------------------------ Internal fields ------------------------- - private AbstractFlinkYarnCluster yarnCluster = null; - private boolean detachedMode = false; - - /** Default yarn application name. */ - private String defaultApplicationName = null; - - public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) { - this.acceptInteractiveInput = acceptInteractiveInput; - - QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); - QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); - SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)"); - FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file"); - JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]"); - TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]"); - CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)"); - SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager"); - DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); - DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); - STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); - NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); - } - - /** - * Creates a new Yarn Client. - * @param cmd the command line to parse options from - * @return an instance of the client or null if there was an error - */ - public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { - - AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); - if (flinkYarnClient == null) { - return null; - } - - if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! - LOG.error("Missing required argument " + CONTAINER.getOpt()); - printUsage(); - return null; - } - flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()))); - - // Jar Path - Path localJarPath; - if (cmd.hasOption(FLINK_JAR.getOpt())) { - String userPath = cmd.getOptionValue(FLINK_JAR.getOpt()); - if(!userPath.startsWith("file://")) { - userPath = "file://" + userPath; - } - localJarPath = new Path(userPath); - } else { - LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar"); - localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath()); - } - - flinkYarnClient.setLocalJarPath(localJarPath); - - // Conf Path - String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); - GlobalConfiguration.loadConfiguration(confDirPath); - Configuration flinkConfiguration = GlobalConfiguration.getConfiguration(); - flinkYarnClient.setFlinkConfiguration(flinkConfiguration); - flinkYarnClient.setConfigurationDirectory(confDirPath); - File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME); - if (!confFile.exists()) { - LOG.error("Unable to locate configuration file in "+confFile); - return null; - } - Path confPath = new Path(confFile.getAbsolutePath()); - - flinkYarnClient.setConfigurationFilePath(confPath); - - List<File> shipFiles = new ArrayList<>(); - // path to directory to ship - if (cmd.hasOption(SHIP_PATH.getOpt())) { - String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); - File shipDir = new File(shipPath); - if (shipDir.isDirectory()) { - shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return !(name.equals(".") || name.equals("..")); - } - }))); - } else { - LOG.warn("Ship directory is not a directory. Ignoring it."); - } - } - - //check if there is a logback or log4j file - if (confDirPath.length() > 0) { - File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME); - if (logback.exists()) { - shipFiles.add(logback); - flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI())); - } - File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME); - if (log4j.exists()) { - shipFiles.add(log4j); - if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) { - // this means there is already a logback configuration file --> fail - LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and " + - "Logback configuration files. Please delete or rename one of them."); - } // else - flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI())); - } - } - - flinkYarnClient.setShipFiles(shipFiles); - - // queue - if (cmd.hasOption(QUEUE.getOpt())) { - flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt())); - } - - // JobManager Memory - if (cmd.hasOption(JM_MEMORY.getOpt())) { - int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); - flinkYarnClient.setJobManagerMemory(jmMemory); - } - - // Task Managers memory - if (cmd.hasOption(TM_MEMORY.getOpt())) { - int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt())); - flinkYarnClient.setTaskManagerMemory(tmMemory); - } - - if (cmd.hasOption(SLOTS.getOpt())) { - int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); - flinkYarnClient.setTaskManagerSlots(slots); - } - - String[] dynamicProperties = null; - if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { - dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); - } - String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, - CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); - - flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); - - if (cmd.hasOption(DETACHED.getOpt())) { - this.detachedMode = true; - flinkYarnClient.setDetachedMode(detachedMode); - } - - if(cmd.hasOption(NAME.getOpt())) { - flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt())); - } else { - // set the default application name, if none is specified - if(defaultApplicationName != null) { - flinkYarnClient.setName(defaultApplicationName); - } - } - - return flinkYarnClient; - } - - - private void printUsage() { - System.out.println("Usage:"); - HelpFormatter formatter = new HelpFormatter(); - formatter.setWidth(200); - formatter.setLeftPadding(5); - formatter.setSyntaxPrefix(" Required"); - Options req = new Options(); - req.addOption(CONTAINER); - formatter.printHelp(" ", req); - - formatter.setSyntaxPrefix(" Optional"); - Options opt = new Options(); - opt.addOption(JM_MEMORY); - opt.addOption(TM_MEMORY); - opt.addOption(QUERY); - opt.addOption(QUEUE); - opt.addOption(SLOTS); - opt.addOption(DYNAMIC_PROPERTIES); - opt.addOption(DETACHED); - opt.addOption(STREAMING); - opt.addOption(NAME); - formatter.printHelp(" ", opt); - } - - public static AbstractFlinkYarnClient getFlinkYarnClient() { - AbstractFlinkYarnClient yarnClient; - try { - Class<? extends AbstractFlinkYarnClient> yarnClientClass = - Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class); - yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class); - } - catch (ClassNotFoundException e) { - System.err.println("Unable to locate the Flink YARN Client. " + - "Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: " + - e.getMessage()); - e.printStackTrace(System.err); - return null; // make it obvious - } - return yarnClient; - } - - private static void writeYarnProperties(Properties properties, File propertiesFile) { - try { - OutputStream out = new FileOutputStream(propertiesFile); - properties.store(out, "Generated YARN properties file"); - out.close(); - } catch (IOException e) { - throw new RuntimeException("Error writing the properties file", e); - } - propertiesFile.setReadable(true, false); // readable for all. - } - - public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster, boolean readConsoleInput) { - final String HELP = "Available commands:\n" + - "help - show these commands\n" + - "stop - stop the YARN session"; - int numTaskmanagers = 0; - try { - BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); - label: - while (true) { - // ------------------ check if there are updates by the cluster ----------- - - GetClusterStatusResponse status = yarnCluster.getClusterStatus(); - LOG.debug("Received status message: {}", status); - - if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { - System.err.println("Number of connected TaskManagers changed to " + - status.numRegisteredTaskManagers() + ". " + - "Slots available: " + status.totalNumberOfSlots()); - numTaskmanagers = status.numRegisteredTaskManagers(); - } - - List<String> messages = yarnCluster.getNewMessages(); - if (messages != null && messages.size() > 0) { - System.err.println("New messages from the YARN cluster: "); - for (String msg : messages) { - System.err.println(msg); - } - } - - if (yarnCluster.hasFailed()) { - System.err.println("The YARN cluster has failed"); - yarnCluster.shutdown(true); - } - - // wait until CLIENT_POLLING_INTERVAL is over or the user entered something. - long startTime = System.currentTimeMillis(); - while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000 - && (!readConsoleInput || !in.ready())) - { - Thread.sleep(200); - } - //------------- handle interactive command by user. ---------------------- - - if (readConsoleInput && in.ready()) { - String command = in.readLine(); - switch (command) { - case "quit": - case "stop": - break label; - - case "help": - System.err.println(HELP); - break; - default: - System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP); - break; - } - } - - if (yarnCluster.hasBeenStopped()) { - LOG.info("Stopping interactive command line interface, YARN cluster has been stopped."); - break; - } - } - } catch(Exception e) { - LOG.warn("Exception while running the interactive command line interface", e); - } - } - - public static void main(String[] args) { - FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", true); // no prefix for the YARN session - System.exit(cli.run(args)); - } - - public void getYARNSessionCLIOptions(Options options) { - options.addOption(FLINK_JAR); - options.addOption(JM_MEMORY); - options.addOption(TM_MEMORY); - options.addOption(CONTAINER); - options.addOption(QUEUE); - options.addOption(QUERY); - options.addOption(SHIP_PATH); - options.addOption(SLOTS); - options.addOption(DYNAMIC_PROPERTIES); - options.addOption(DETACHED); - options.addOption(STREAMING); - options.addOption(NAME); - } - - public int run(String[] args) { - // - // Command Line Options - // - Options options = new Options(); - getYARNSessionCLIOptions(options); - - CommandLineParser parser = new PosixParser(); - CommandLine cmd; - try { - cmd = parser.parse(options, args); - } catch(Exception e) { - System.out.println(e.getMessage()); - printUsage(); - return 1; - } - - // Query cluster for metrics - if (cmd.hasOption(QUERY.getOpt())) { - AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); - String description; - try { - description = flinkYarnClient.getClusterDescription(); - } catch (Exception e) { - System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - System.out.println(description); - return 0; - } else { - AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd); - - if (flinkYarnClient == null) { - System.err.println("Error while starting the YARN Client. Please check log output!"); - return 1; - } - - try { - yarnCluster = flinkYarnClient.deploy(); - // only connect to cluster if its not a detached session. - if(!flinkYarnClient.isDetached()) { - yarnCluster.connectToCluster(); - } - } catch (Exception e) { - System.err.println("Error while deploying YARN cluster: "+e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - //------------------ Cluster deployed, handle connection details - String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort(); - System.out.println("Flink JobManager is now running on " + jobManagerAddress); - System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); - - // file that we write into the conf/ dir containing the jobManager address and the dop. - File yarnPropertiesFile = new File(CliFrontend.getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration())); - - Properties yarnProps = new Properties(); - yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); - if (flinkYarnClient.getTaskManagerSlots() != -1) { - String parallelism = - Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()); - yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_PARALLELISM, parallelism); - } - // add dynamic properties - if (flinkYarnClient.getDynamicPropertiesEncoded() != null) { - yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, - flinkYarnClient.getDynamicPropertiesEncoded()); - } - writeYarnProperties(yarnProps, yarnPropertiesFile); - - //------------------ Cluster running, let user control it ------------ - - if (detachedMode) { - // print info and quit: - LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + - "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill "+yarnCluster.getApplicationId()+"\n" + - "Please also note that the temporary files of the YARN session in {} will not be removed.", - flinkYarnClient.getSessionFilesDir()); - } else { - runInteractiveCli(yarnCluster, acceptInteractiveInput); - - if (!yarnCluster.hasBeenStopped()) { - LOG.info("Command Line Interface requested session shutdown"); - yarnCluster.shutdown(false); - } - - try { - yarnPropertiesFile.delete(); - } catch (Exception e) { - LOG.warn("Exception while deleting the JobManager address file", e); - } - } - } - return 0; - } - - /** - * Sets the default Yarn Application Name. - * @param defaultApplicationName the name of the yarn application to use - * @return FlinkYarnSessionCli instance, for chaining - */ - public FlinkYarnSessionCli withDefaultApplicationName(String defaultApplicationName) { - this.defaultApplicationName = defaultApplicationName; - return this; - } - - /** - * Utility method for tests. - */ - public void stop() { - if (yarnCluster != null) { - LOG.info("Command line interface is shutting down the yarnCluster"); - yarnCluster.shutdown(false); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index ab70453..86b36b3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -27,8 +27,9 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; +import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.ConfigConstants; @@ -57,7 +58,7 @@ public class RemoteExecutor extends PlanExecutor { private final Configuration clientConfiguration; - private Client client; + private ClusterClient client; private int defaultParallelism = 1; @@ -149,7 +150,7 @@ public class RemoteExecutor extends PlanExecutor { public void start() throws Exception { synchronized (lock) { if (client == null) { - client = new Client(clientConfiguration); + client = new StandaloneClusterClient(clientConfiguration); client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); } else { @@ -207,7 +208,7 @@ public class RemoteExecutor extends PlanExecutor { } try { - return client.runBlocking(program, defaultParallelism); + return client.run(program, defaultParallelism).getJobExecutionResult(); } finally { if (shutDownAtEnd) { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index b75952e..f28d1b6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -24,8 +24,16 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * A simple command line parser (based on Apache Commons CLI) that extracts command @@ -33,9 +41,17 @@ import org.apache.flink.client.FlinkYarnSessionCli; */ public class CliFrontendParser { + private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class); + + /** command line interface of the YARN session, with a special initialization here * to prefix all options with y/yarn. */ - private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn", true); + private static final Map<String, CustomCommandLine> customCommandLine = new HashMap<>(1); + + static { + // we could easily add more here in the future + loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn"); + } static final Option HELP_OPTION = new Option("h", "help", false, @@ -43,7 +59,7 @@ public class CliFrontendParser { static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); - public static final Option CLASS_OPTION = new Option("c", "class", true, + static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " + "JAR file does not specify the class in its manifest."); @@ -53,23 +69,23 @@ public class CliFrontendParser { "times for specifying more than one URL. The protocol must be supported by the " + "{@link java.net.URLClassLoader}."); - static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, + public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value " + "specified in the configuration."); static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " + "supress logging output to standard out."); - static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " + + public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " + "the job in detached mode"); static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, - "Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER + - "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " + - "different JobManager than the one specified in the configuration."); + "Address of the JobManager (master) to which to connect. " + + "Specify " + getCliIdentifierString() +" as the JobManager to deploy a cluster for the job. " + + "Use this flag to connect to a different JobManager than the one specified in the configuration."); static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true, "Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537)."); @@ -143,8 +159,10 @@ public class CliFrontendParser { options.addOption(DETACHED_OPTION); options.addOption(SAVEPOINT_PATH_OPTION); - // also add the YARN options so that the parser can parse them - yarnSessionCLi.getYARNSessionCLIOptions(options); + for (CustomCommandLine customCLI : customCommandLine.values()) { + customCLI.addOptions(options); + } + return options; } @@ -240,10 +258,16 @@ public class CliFrontendParser { System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>"); formatter.setSyntaxPrefix(" \"run\" action options:"); formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options())); - formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:"); - Options yarnOpts = new Options(); - yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); - formatter.printHelp(" ", yarnOpts); + + // prints options from all available command-line classes + for (Map.Entry<String, CustomCommandLine> entry: customCommandLine.entrySet()) { + formatter.setSyntaxPrefix(" Additional arguments if -m " + entry.getKey() + " is set:"); + Options customOpts = new Options(); + entry.getValue().addOptions(customOpts); + formatter.printHelp(" ", customOpts); + System.out.println(); + } + System.out.println(); } @@ -376,7 +400,63 @@ public class CliFrontendParser { } } - public static FlinkYarnSessionCli getFlinkYarnSessionCli() { - return yarnSessionCLi; + public static Map<String, CustomCommandLine> getAllCustomCommandLine() { + if (customCommandLine.isEmpty()) { + LOG.warn("No custom command-line classes were loaded."); + } + return Collections.unmodifiableMap(customCommandLine); + } + + private static String getCliIdentifierString() { + StringBuilder builder = new StringBuilder(); + boolean first = true; + for (String identifier : customCommandLine.keySet()) { + if (!first) { + builder.append(", "); + } + first = false; + builder.append("'").append(identifier).append("'"); + } + return builder.toString(); + } + + /** + * Gets the custom command-line for this identifier. + * @param identifier The unique identifier for this command-line implementation. + * @return CustomCommandLine or null if none was found + */ + public static CustomCommandLine getActiveCustomCommandLine(String identifier) { + return CliFrontendParser.getAllCustomCommandLine().get(identifier); } + + private static void loadCustomCommandLine(String className, Object... params) { + + try { + Class<? extends CustomCommandLine> customCliClass = + Class.forName(className).asSubclass(CustomCommandLine.class); + + // construct class types from the parameters + Class<?>[] types = new Class<?>[params.length]; + for (int i = 0; i < params.length; i++) { + Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null."); + types[i] = params[i].getClass(); + } + + Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types); + final CustomCommandLine cli = constructor.newInstance(params); + + String cliIdentifier = Preconditions.checkNotNull(cli.getIdentifier()); + CustomCommandLine existing = customCommandLine.put(cliIdentifier, cli); + + if (existing != null) { + throw new IllegalStateException("Attempted to register " + cliIdentifier + + " but there is already a command-line with this identifier."); + } + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException + | InvocationTargetException e) { + LOG.warn("Unable to locate custom CLI class {}. " + + "Flink is not compiled with support for this class.", className, e); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java new file mode 100644 index 0000000..cd5e0e6 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; + + +/** + * Custom command-line interface to load hooks for the command-line interface. + */ +public interface CustomCommandLine<ClusterType extends ClusterClient> { + + /** + * Returns a unique identifier for this custom command-line. + * @return An unique identifier string + */ + String getIdentifier(); + + /** + * Adds custom options to the existing options. + * @param baseOptions The existing options. + */ + void addOptions(Options baseOptions); + + /** + * Retrieves a client for a running cluster + * @param config The Flink config + * @return Client if a cluster could be retrieve, null otherwise + */ + ClusterClient retrieveCluster(Configuration config) throws Exception; + + /** + * Creates the client for the cluster + * @param applicationName The application name to use + * @param commandLine The command-line options parsed by the CliFrontend + * @return The client to communicate with the cluster which the CustomCommandLine brought up. + */ + ClusterType createClient(String applicationName, CommandLine commandLine) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java new file mode 100644 index 0000000..cf0595b --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + + +import org.apache.flink.client.program.ClusterClient; + +/** + * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication. + */ +public interface ClusterDescriptor<ClientType extends ClusterClient> { + + /** + * Returns a String containing details about the cluster (NodeManagers, available memory, ...) + * + */ + String getClusterDescription() throws Exception; + + /** + * Triggers deployment of a cluster + * @return Client for the cluster + * @throws Exception + */ + ClientType deploy() throws Exception; +}
