http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index a5b8af7..9130fdd 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -55,6 +56,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -77,9 +79,6 @@ public class YarnClusterClient extends ClusterClient { // (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown. private final Path sessionFilesDir; - /** The leader retrieval service for connecting to the cluster and finding the active leader. */ - private final LeaderRetrievalService leaderRetrievalService; - //---------- Class internal fields ------------------- private final AbstractYarnClusterDescriptor clusterDescriptor; @@ -92,6 +91,7 @@ public class YarnClusterClient extends ClusterClient { private boolean isConnected = false; + private final boolean perJobCluster; /** * Create a new Flink on YARN cluster. @@ -101,6 +101,7 @@ public class YarnClusterClient extends ClusterClient { * @param appReport the YARN application ID * @param flinkConfig Flink configuration * @param sessionFilesDir Location of files required for YARN session + * @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown * @throws IOException * @throws YarnException */ @@ -109,7 +110,8 @@ public class YarnClusterClient extends ClusterClient { final YarnClient yarnClient, final ApplicationReport appReport, org.apache.flink.configuration.Configuration flinkConfig, - Path sessionFilesDir) throws IOException, YarnException { + Path sessionFilesDir, + boolean perJobCluster) throws IOException, YarnException { super(flinkConfig); @@ -122,18 +124,16 @@ public class YarnClusterClient extends ClusterClient { this.applicationId = appReport; this.appId = appReport.getApplicationId(); this.trackingURL = appReport.getTrackingUrl(); + this.perJobCluster = perJobCluster; + /* The leader retrieval service for connecting to the cluster and finding the active leader. */ + LeaderRetrievalService leaderRetrievalService; try { leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); } catch (Exception e) { throw new IOException("Could not create the leader retrieval service.", e); } - - if (isConnected) { - throw new IllegalStateException("Already connected to the cluster."); - } - // start application client LOG.info("Start application client."); @@ -182,28 +182,31 @@ public class YarnClusterClient extends ClusterClient { isConnected = true; - logAndSysout("Waiting until all TaskManagers have connected"); + if (perJobCluster) { - while(true) { - GetClusterStatusResponse status = getClusterStatus(); - if (status != null) { - if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) { - logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" - + clusterDescriptor.getTaskManagerCount() + ")"); + logAndSysout("Waiting until all TaskManagers have connected"); + + while (true) { + GetClusterStatusResponse status = getClusterStatus(); + if (status != null) { + if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) { + logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" + + clusterDescriptor.getTaskManagerCount() + ")"); + } else { + logAndSysout("All TaskManagers are connected"); + break; + } } else { - logAndSysout("All TaskManagers are connected"); - break; + logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); } - } else { - logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); - } - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for TaskManagers"); - System.err.println("Thread is interrupted"); - throw new IOException("Interrupted while waiting for TaskManagers", e); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for TaskManagers"); + System.err.println("Thread is interrupted"); + throw new IOException("Interrupted while waiting for TaskManagers", e); + } } } } @@ -214,9 +217,12 @@ public class YarnClusterClient extends ClusterClient { } LOG.info("Disconnecting YarnClusterClient from ApplicationMaster"); - if(!Runtime.getRuntime().removeShutdownHook(clientShutdownHook)) { - LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally"); + try { + Runtime.getRuntime().removeShutdownHook(clientShutdownHook); + } catch (IllegalStateException e) { + // we are already in the shutdown hook } + // tell the actor to shut down. applicationClient.tell(PoisonPill.getInstance(), applicationClient); @@ -265,12 +271,30 @@ public class YarnClusterClient extends ClusterClient { @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - if (isDetached()) { - JobSubmissionResult result = super.runDetached(jobGraph, classLoader); + if (perJobCluster) { stopAfterJob(jobGraph.getJobID()); - return result; + } + + if (isDetached()) { + return super.runDetached(jobGraph, classLoader); } else { - return super.run(jobGraph, classLoader); + try { + return super.run(jobGraph, classLoader); + } finally { + // show cluster status + List<String> msgs = getNewMessages(); + if (msgs != null && msgs.size() > 1) { + + logAndSysout("The following messages were created by the YARN cluster while running the Job:"); + for (String msg : msgs) { + logAndSysout(msg); + } + } + if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) { + logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus()); + logAndSysout("YARN Diagnostics: " + getDiagnostics()); + } + } } } @@ -298,8 +322,9 @@ public class YarnClusterClient extends ClusterClient { throw new IllegalStateException("The cluster is not connected to the ApplicationMaster."); } if(hasBeenShutdown()) { - throw new RuntimeException("The YarnClusterClient has already been stopped"); + return null; } + Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout); Object clusterStatus; try { @@ -417,32 +442,20 @@ public class YarnClusterClient extends ClusterClient { @Override public void finalizeCluster() { - if (!isConnected) { - throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster."); - } - - if (isDetached()) { - // only disconnect if we are running detached + if (isDetached() || !perJobCluster) { + // only disconnect if we are not running a per job cluster disconnect(); - return; + } else { + shutdownCluster(); } + } - // show cluster status - - List<String> msgs = getNewMessages(); - if (msgs != null && msgs.size() > 1) { + public void shutdownCluster() { - logAndSysout("The following messages were created by the YARN cluster while running the Job:"); - for (String msg : msgs) { - logAndSysout(msg); - } - } - if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) { - logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus()); - logAndSysout("YARN Diagnostics: " + getDiagnostics()); + if (!isConnected) { + throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster."); } - if(hasBeenShutDown.getAndSet(true)) { return; } @@ -471,13 +484,30 @@ public class YarnClusterClient extends ClusterClient { actorSystem.awaitTermination(); } - LOG.info("Deleting files in " + sessionFilesDir); try { - FileSystem shutFS = FileSystem.get(hadoopConfig); - shutFS.delete(sessionFilesDir, true); // delete conf and jar file. - shutFS.close(); - }catch(IOException e){ - LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e); + File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig); + if (propertiesFile.isFile()) { + if (propertiesFile.delete()) { + LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString()); + } else { + LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString()); + } + } + } catch (Exception e) { + LOG.warn("Exception while deleting the JobManager address file", e); + } + + if (sessionFilesDir != null) { + LOG.info("Deleting files in " + sessionFilesDir); + try { + FileSystem shutFS = FileSystem.get(hadoopConfig); + shutFS.delete(sessionFilesDir, true); // delete conf and jar file. + shutFS.close(); + } catch (IOException e) { + LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e); + } + } else { + LOG.warn("Session file directory not set. Not deleting session files"); } try { @@ -571,7 +601,6 @@ public class YarnClusterClient extends ClusterClient { @Override public boolean isDetached() { - // either we have set detached mode using the general '-d' flag or using the Yarn CLI flag 'yd' return super.isDetached() || clusterDescriptor.isDetachedMode(); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 43e7c7b..5f745b2 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -17,10 +17,12 @@ */ package org.apache.flink.yarn; + /** * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}. */ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor { + @Override protected Class<?> getApplicationMasterClass() { return YarnApplicationMasterRunner.class; http://git-wip-us.apache.org/repos/asf/flink/blob/f4ac8522/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index fdcc858..5eca4f1 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -28,11 +28,9 @@ import org.apache.flink.client.CliFrontend; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -59,6 +57,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; + /** * Class handling the command line interface to the YARN session. */ @@ -97,8 +97,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> private final Option CONTAINER; private final Option SLOTS; private final Option DETACHED; + @Deprecated private final Option STREAMING; private final Option NAME; + + private final Options ALL_OPTIONS; /** * Dynamic properties allow the user to specify additional configuration values with -D, such as @@ -118,7 +121,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean acceptInteractiveInput) { this.acceptInteractiveInput = acceptInteractiveInput; - + QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)"); APPLICATION_ID = new Option(shortPrefix + "id", longPrefix + "applicationId", true, "Attach to running YARN session"); QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue."); @@ -132,37 +135,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); + + ALL_OPTIONS = new Options(); + ALL_OPTIONS.addOption(FLINK_JAR); + ALL_OPTIONS.addOption(JM_MEMORY); + ALL_OPTIONS.addOption(TM_MEMORY); + ALL_OPTIONS.addOption(CONTAINER); + ALL_OPTIONS.addOption(QUEUE); + ALL_OPTIONS.addOption(QUERY); + ALL_OPTIONS.addOption(SHIP_PATH); + ALL_OPTIONS.addOption(SLOTS); + ALL_OPTIONS.addOption(DYNAMIC_PROPERTIES); + ALL_OPTIONS.addOption(DETACHED); + ALL_OPTIONS.addOption(STREAMING); + ALL_OPTIONS.addOption(NAME); + ALL_OPTIONS.addOption(APPLICATION_ID); } - /** - * Attaches a new Yarn Client to running YARN application. - * - */ - public AbstractFlinkYarnCluster attachFlinkYarnClient(CommandLine cmd) { - AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); - if (flinkYarnClient == null) { - return null; - } - if (!cmd.hasOption(APPLICATION_ID.getOpt())) { - LOG.error("Missing required argument " + APPLICATION_ID.getOpt()); - printUsage(); - return null; - } - - String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); - GlobalConfiguration.loadConfiguration(confDirPath); - Configuration flinkConfiguration = GlobalConfiguration.getConfiguration(); - flinkYarnClient.setFlinkConfiguration(flinkConfiguration); - flinkYarnClient.setConfigurationDirectory(confDirPath); - - try { - return flinkYarnClient.attach(cmd.getOptionValue(APPLICATION_ID.getOpt())); - } catch (Exception e) { - LOG.error("Could not attach to YARN session", e); - return null; - } - } /** * Resumes from a Flink Yarn properties file * @param flinkConfiguration The flink configuration @@ -170,7 +160,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> */ private boolean resumeFromYarnProperties(Configuration flinkConfiguration) { // load the YARN properties - File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration)); + File propertiesFile = getYarnPropertiesLocation(flinkConfiguration); if (!propertiesFile.exists()) { return false; } @@ -209,7 +199,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> try { jobManagerAddress = ClientUtils.parseHostPortAddress(address); // store address in config from where it is retrieved by the retrieval service - CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress); + CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress); } catch (Exception e) { throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e); @@ -228,10 +218,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> return true; } - public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { - + public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { - YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(); + AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(); if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! LOG.error("Missing required argument {}", CONTAINER.getOpt()); @@ -343,19 +332,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> return yarnClusterDescriptor; } - @Override - public YarnClusterClient createClient(String applicationName, CommandLine cmdLine) throws Exception { - - YarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); - - try { - return yarnClusterDescriptor.deploy(); - } catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } - - } - private void printUsage() { System.out.println("Usage:"); HelpFormatter formatter = new HelpFormatter(); @@ -367,17 +343,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> formatter.printHelp(" ", req); formatter.setSyntaxPrefix(" Optional"); - Options opt = new Options(); - opt.addOption(JM_MEMORY); - opt.addOption(TM_MEMORY); - opt.addOption(QUERY); - opt.addOption(QUEUE); - opt.addOption(SLOTS); - opt.addOption(DYNAMIC_PROPERTIES); - opt.addOption(DETACHED); - opt.addOption(STREAMING); - opt.addOption(NAME); - formatter.printHelp(" ", opt); + Options options = new Options(); + addGeneralOptions(options); + addRunOptions(options); + formatter.printHelp(" ", options); } private static void writeYarnProperties(Properties properties, File propertiesFile) { @@ -439,6 +408,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> switch (command) { case "quit": case "stop": + yarnCluster.shutdownCluster(); break label; case "help": @@ -466,38 +436,62 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } @Override - public String getIdentifier() { + public boolean isActive(CommandLine commandLine, Configuration configuration) { + String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); + boolean yarnJobManager = ID.equals(jobManagerOption); + return yarnJobManager || resumeFromYarnProperties(configuration); + } + + @Override + public String getId() { return ID; } - public void addOptions(Options options) { - options.addOption(FLINK_JAR); - options.addOption(JM_MEMORY); - options.addOption(TM_MEMORY); - options.addOption(CONTAINER); - options.addOption(QUEUE); - options.addOption(QUERY); - options.addOption(SHIP_PATH); - options.addOption(SLOTS); - options.addOption(DYNAMIC_PROPERTIES); - options.addOption(DETACHED); - options.addOption(STREAMING); - options.addOption(NAME); + @Override + public void addRunOptions(Options baseOptions) { + for (Object option : ALL_OPTIONS.getOptions()) { + baseOptions.addOption((Option) option); + } } + @Override + public void addGeneralOptions(Options baseOptions) { + baseOptions.addOption(APPLICATION_ID); + } - public void getYARNAttachCLIOptions(Options options) { - options.addOption(APPLICATION_ID); + @Override + public YarnClusterClient retrieveCluster( + CommandLine cmdLine, + Configuration config) throws UnsupportedOperationException { + + // first check for an application id + if (cmdLine.hasOption(APPLICATION_ID.getOpt())) { + String applicationID = cmdLine.getOptionValue(APPLICATION_ID.getOpt()); + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + yarnDescriptor.setFlinkConfiguration(config); + return yarnDescriptor.retrieve(applicationID); + // then try to load from yarn properties + } else if (resumeFromYarnProperties(config)) { + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + yarnDescriptor.setFlinkConfiguration(config); + return yarnDescriptor.retrieveFromConfig(config); + } + + throw new UnsupportedOperationException("Could not resume a Yarn cluster."); } @Override - public ClusterClient retrieveCluster(Configuration config) throws Exception { + public YarnClusterClient createCluster(String applicationName, CommandLine cmdLine, Configuration config) { + + AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); + yarnClusterDescriptor.setFlinkConfiguration(config); - if(resumeFromYarnProperties(config)) { - return new StandaloneClusterClient(config); + try { + return yarnClusterDescriptor.deploy(); + } catch (Exception e) { + throw new RuntimeException("Error deploying the YARN cluster", e); } - return null; } public int run(String[] args) { @@ -505,7 +499,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> // Command Line Options // Options options = new Options(); - addOptions(options); + addGeneralOptions(options); + addRunOptions(options); CommandLineParser parser = new PosixParser(); CommandLine cmd; @@ -519,10 +514,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> // Query cluster for metrics if (cmd.hasOption(QUERY.getOpt())) { - YarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(); + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); String description; try { - description = flinkYarnClient.getClusterDescription(); + description = yarnDescriptor.getClusterDescription(); } catch (Exception e) { System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage()); e.printStackTrace(System.err); @@ -531,56 +526,61 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> System.out.println(description); return 0; } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { - yarnCluster = attachFlinkYarnClient(cmd); + + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + try { + yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(APPLICATION_ID.getOpt())); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve existing Yarn application", e); + } if (detachedMode) { LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill "+yarnCluster.getApplicationId()); + "yarn application -kill "+yarnCluster.getClusterIdentifier()); + yarnCluster.disconnect(); } else { - runInteractiveCli(yarnCluster); - - if (!yarnCluster.hasBeenStopped()) { - LOG.info("Command Line Interface requested session shutdown"); - yarnCluster.shutdown(false); - } + runInteractiveCli(yarnCluster, true); } } else { - YarnClusterDescriptor flinkYarnClient; + AbstractYarnClusterDescriptor yarnDescriptor; try { - flinkYarnClient = createDescriptor(null, cmd); + yarnDescriptor = createDescriptor(null, cmd); } catch (Exception e) { System.err.println("Error while starting the YARN Client. Please check log output!"); return 1; } try { - yarnCluster = flinkYarnClient.deploy(); + yarnCluster = yarnDescriptor.deploy(); } catch (Exception e) { System.err.println("Error while deploying YARN cluster: "+e.getMessage()); e.printStackTrace(System.err); return 1; } //------------------ ClusterClient deployed, handle connection details - String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort(); + String jobManagerAddress = + yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + + ":" + yarnCluster.getJobManagerAddress().getPort(); + System.out.println("Flink JobManager is now running on " + jobManagerAddress); System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); // file that we write into the conf/ dir containing the jobManager address and the dop. - File yarnPropertiesFile = new File(getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration())); + File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()); Properties yarnProps = new Properties(); yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); - if (flinkYarnClient.getTaskManagerSlots() != -1) { + if (yarnDescriptor.getTaskManagerSlots() != -1) { String parallelism = - Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()); + Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount()); yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism); } // add dynamic properties - if (flinkYarnClient.getDynamicPropertiesEncoded() != null) { + if (yarnDescriptor.getDynamicPropertiesEncoded() != null) { yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, - flinkYarnClient.getDynamicPropertiesEncoded()); + yarnDescriptor.getDynamicPropertiesEncoded()); } writeYarnProperties(yarnProps, yarnPropertiesFile); @@ -592,21 +592,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill " + yarnCluster.getClusterIdentifier() + "\n" + "Please also note that the temporary files of the YARN session in {} will not be removed.", - flinkYarnClient.getSessionFilesDir()); + yarnDescriptor.getSessionFilesDir()); yarnCluster.disconnect(); } else { runInteractiveCli(yarnCluster, acceptInteractiveInput); - - if (!yarnCluster.hasBeenShutdown()) { - LOG.info("Command Line Interface requested session shutdown"); - yarnCluster.shutdown(); - } - - try { - yarnPropertiesFile.delete(); - } catch (Exception e) { - LOG.warn("Exception while deleting the JobManager address file", e); - } } } return 0; @@ -649,11 +638,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> } } - private static String getYarnPropertiesLocation(Configuration conf) { + public static File getYarnPropertiesLocation(Configuration conf) { String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + String propertiesFileLocation = + conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + + return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); + } - return propertiesFileLocation + File.separator + YARN_PROPERTIES_FILE + currentUser; + protected AbstractYarnClusterDescriptor getClusterDescriptor() { + return new YarnClusterDescriptor(); } }
