[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327534#comment-15327534 ]
ASF GitHub Bot commented on FLINK-3937: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805499 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** + * Gets a Hadoop Yarn client + * @return Returns a YarnClient which has to be shutdown manually + */ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** + * Retrieves the Yarn application and cluster from the config + * @param config The config with entries to retrieve the cluster + * @return YarnClusterClient + * @deprecated This should be removed in the future + */ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List<ApplicationReport> applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + + } + return null; --- End diff -- Same as above. Fixed. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > ------------------------------------------------------------------------------ > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement > Reporter: Sebastian Klemke > Assignee: Maximilian Michels > Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid <yarnApplicationId> option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)