Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2085#discussion_r66795439
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -108,24 +131,83 @@ public FlinkYarnSessionCli(String shortPrefix, String
longPrefix, boolean accept
NAME = new Option(shortPrefix + "nm", longPrefix + "name",
true, "Set a custom name for the application on YARN");
}
+
/**
- * Creates a new Yarn Client.
- * @param cmd the command line to parse options from
- * @return an instance of the client or null if there was an error
+ * Resumes from a Flink Yarn properties file
+ * @param flinkConfiguration The flink configuration
+ * @return True if the properties were loaded, false otherwise
*/
- public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
+ private boolean resumeFromYarnProperties(Configuration
flinkConfiguration) {
+ // load the YARN properties
+ File propertiesFile = new
File(getYarnPropertiesLocation(flinkConfiguration));
+ if (!propertiesFile.exists()) {
+ return false;
+ }
+
+ logAndSysout("Found YARN properties file " +
propertiesFile.getAbsolutePath());
+
+ Properties yarnProperties = new Properties();
+ try {
+ try (InputStream is = new
FileInputStream(propertiesFile)) {
+ yarnProperties.load(is);
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Cannot read the YARN
properties file", e);
+ }
+
+ // configure the default parallelism from YARN
+ String propParallelism =
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
+ if (propParallelism != null) { // maybe the property is not set
+ try {
+ int parallelism =
Integer.parseInt(propParallelism);
+
flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY,
parallelism);
+
+ logAndSysout("YARN properties set default
parallelism to " + parallelism);
+ }
+ catch (NumberFormatException e) {
+ throw new RuntimeException("Error while parsing
the YARN properties: " +
+ "Property " +
YARN_PROPERTIES_PARALLELISM + " is not an integer.");
+ }
+ }
+
+ // get the JobManager address from the YARN properties
+ String address =
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+ InetSocketAddress jobManagerAddress;
+ if (address != null) {
+ try {
+ jobManagerAddress =
ClientUtils.parseHostPortAddress(address);
+ // store address in config from where it is
retrieved by the retrieval service
+
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration,
jobManagerAddress);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("YARN properties
contain an invalid entry for JobManager address.", e);
+ }
+
+ logAndSysout("Using JobManager address from YARN
properties " + jobManagerAddress);
+ }
- AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
- if (flinkYarnClient == null) {
- return null;
+ // handle the YARN client's dynamic properties
+ String dynamicPropertiesEncoded =
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+ Map<String, String> dynamicProperties =
getDynamicProperties(dynamicPropertiesEncoded);
+ for (Map.Entry<String, String> dynamicProperty :
dynamicProperties.entrySet()) {
+ flinkConfiguration.setString(dynamicProperty.getKey(),
dynamicProperty.getValue());
}
+ return true;
+ }
+
+ public YarnClusterDescriptor createDescriptor(String
defaultApplicationName, CommandLine cmd) {
+
+
+ YarnClusterDescriptor yarnClusterDescriptor = new
YarnClusterDescriptor();
--- End diff --
A bit too many blank lines ;)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---