[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327534#comment-15327534
 ] 

ASF GitHub Bot commented on FLINK-3937:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66805499
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
    @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
                return false;
        }
     
    -   @Override
        public void setDetachedMode(boolean detachedMode) {
                this.detached = detachedMode;
        }
     
    -   @Override
    -   public boolean isDetached() {
    +   public boolean isDetachedMode() {
                return detached;
        }
     
    +
    +   /**
    +    * Gets a Hadoop Yarn client
    +    * @return Returns a YarnClient which has to be shutdown manually
    +    */
    +   private static YarnClient getYarnClient(Configuration conf) {
    +           YarnClient yarnClient = YarnClient.createYarnClient();
    +           yarnClient.init(conf);
    +           yarnClient.start();
    +           return yarnClient;
    +   }
    +
    +   /**
    +    * Retrieves the Yarn application and cluster from the config
    +    * @param config The config with entries to retrieve the cluster
    +    * @return YarnClusterClient
    +    * @deprecated This should be removed in the future
    +    */
    +   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
    +           String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
    +           int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
    +
    +           if (jobManagerHost != null && jobManagerPort != -1) {
    +
    +                   YarnClient yarnClient = getYarnClient(conf);
    +                   List<ApplicationReport> applicationReports = 
yarnClient.getApplications();
    +                   for (ApplicationReport report : applicationReports) {
    +                           if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
    +                                   LOG.info("Found application '{}' " +
    +                                           "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
    +                                           report.getApplicationId(), 
jobManagerHost, jobManagerPort);
    +                                   return 
retrieve(report.getApplicationId().toString());
    +                           }
    +                   }
    +
    +           }
    +           return null;
    --- End diff --
    
    Same as above. Fixed.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-3937
>                 URL: https://issues.apache.org/jira/browse/FLINK-3937
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Sebastian Klemke
>            Assignee: Maximilian Michels
>            Priority: Trivial
>         Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid <yarnApplicationId> option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to