Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66795439
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -108,24 +131,83 @@ public FlinkYarnSessionCli(String shortPrefix, String 
longPrefix, boolean accept
                NAME = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
        }
     
    +
        /**
    -    * Creates a new Yarn Client.
    -    * @param cmd the command line to parse options from
    -    * @return an instance of the client or null if there was an error
    +    * Resumes from a Flink Yarn properties file
    +    * @param flinkConfiguration The flink configuration
    +    * @return True if the properties were loaded, false otherwise
         */
    -   public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
    +   private boolean resumeFromYarnProperties(Configuration 
flinkConfiguration) {
    +           // load the YARN properties
    +           File propertiesFile = new 
File(getYarnPropertiesLocation(flinkConfiguration));
    +           if (!propertiesFile.exists()) {
    +                   return false;
    +           }
    +
    +           logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
    +
    +           Properties yarnProperties = new Properties();
    +           try {
    +                   try (InputStream is = new 
FileInputStream(propertiesFile)) {
    +                           yarnProperties.load(is);
    +                   }
    +           }
    +           catch (IOException e) {
    +                   throw new RuntimeException("Cannot read the YARN 
properties file", e);
    +           }
    +
    +           // configure the default parallelism from YARN
    +           String propParallelism = 
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
    +           if (propParallelism != null) { // maybe the property is not set
    +                   try {
    +                           int parallelism = 
Integer.parseInt(propParallelism);
    +                           
flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 
parallelism);
    +
    +                           logAndSysout("YARN properties set default 
parallelism to " + parallelism);
    +                   }
    +                   catch (NumberFormatException e) {
    +                           throw new RuntimeException("Error while parsing 
the YARN properties: " +
    +                                   "Property " + 
YARN_PROPERTIES_PARALLELISM + " is not an integer.");
    +                   }
    +           }
    +
    +           // get the JobManager address from the YARN properties
    +           String address = 
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
    +           InetSocketAddress jobManagerAddress;
    +           if (address != null) {
    +                   try {
    +                           jobManagerAddress = 
ClientUtils.parseHostPortAddress(address);
    +                           // store address in config from where it is 
retrieved by the retrieval service
    +                           
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, 
jobManagerAddress);
    +                   }
    +                   catch (Exception e) {
    +                           throw new RuntimeException("YARN properties 
contain an invalid entry for JobManager address.", e);
    +                   }
    +
    +                   logAndSysout("Using JobManager address from YARN 
properties " + jobManagerAddress);
    +           }
     
    -           AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
    -           if (flinkYarnClient == null) {
    -                   return null;
    +           // handle the YARN client's dynamic properties
    +           String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
    +           Map<String, String> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
    +           for (Map.Entry<String, String> dynamicProperty : 
dynamicProperties.entrySet()) {
    +                   flinkConfiguration.setString(dynamicProperty.getKey(), 
dynamicProperty.getValue());
                }
     
    +           return true;
    +   }
    +
    +   public YarnClusterDescriptor createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
    +
    +
    +           YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor();
    --- End diff --
    
    A bit too many blank lines ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to