[FLINK-4144] Yarn properties file: replace hostname/port with Yarn application 
id

This closes #2191


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ab6837f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ab6837f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ab6837f

Branch: refs/heads/master
Commit: 7ab6837fde3adb588273ef6bb8f4f7a215fe9c03
Parents: f722b73
Author: Maximilian Michels <[email protected]>
Authored: Fri Jul 1 18:54:44 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Jul 1 20:12:46 2016 +0200

----------------------------------------------------------------------
 ...CliFrontendYarnAddressConfigurationTest.java |  3 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 38 -----------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 70 +++++++++++---------
 3 files changed, 38 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index a99c835..c3328a2 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -109,8 +109,7 @@ public class CliFrontendYarnAddressConfigurationTest {
        private static final ApplicationId TEST_YARN_APPLICATION_ID =
                ApplicationId.newInstance(System.currentTimeMillis(), 42);
 
-       private static final String validPropertiesFile =
-               "jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + 
TEST_YARN_JOB_MANAGER_PORT;
+       private static final String validPropertiesFile = "applicationID=" + 
TEST_YARN_APPLICATION_ID;
 
 
        private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";

http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 641182e..5d47b13 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,7 +22,6 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 
@@ -302,43 +301,6 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                return yarnClient;
        }
 
-       /**
-        * Retrieves the Yarn application and cluster from the config
-        * @param config The config with entries to retrieve the cluster
-        * @return YarnClusterClient
-        * @deprecated This should be removed in the future
-        */
-       public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config)
-                       throws UnsupportedOperationException {
-               String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-               int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-
-               if (jobManagerHost != null && jobManagerPort != -1) {
-
-                       YarnClient yarnClient = getYarnClient();
-                       final List<ApplicationReport> applicationReports;
-                       try {
-                               applicationReports = 
yarnClient.getApplications();
-                       } catch (Exception e) {
-                               throw new RuntimeException("Couldn't get Yarn 
application reports", e);
-                       }
-                       for (ApplicationReport report : applicationReports) {
-                               if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
-                                       LOG.info("Found application '{}' " +
-                                               "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
-                                               report.getApplicationId(), 
jobManagerHost, jobManagerPort);
-                                       return 
retrieve(report.getApplicationId().toString());
-                               }
-                       }
-
-               }
-
-               LOG.warn("Couldn't retrieve Yarn cluster from Flink 
configuration using JobManager address '{}:{}'",
-                       jobManagerHost, jobManagerPort);
-
-               throw new IllegalConfigurationException("Could not resume Yarn 
cluster from config.");
-       }
-
        @Override
        public YarnClusterClient retrieve(String applicationID) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 126f0f1..989bee4 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,18 +24,18 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +47,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -75,7 +74,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
        // YARN-session related constants
        private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
-       private static final String YARN_PROPERTIES_JOBMANAGER_KEY = 
"jobManager";
+       static final String YARN_APPLICATION_ID_KEY = "applicationID";
        private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
        private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = 
"dynamicPropertiesString";
 
@@ -152,24 +151,24 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
 
        /**
-        * Resumes from a Flink Yarn properties file
+        * Tries to load a Flink Yarn properties file and returns the Yarn 
application id if successful
         * @param cmdLine The command-line parameters
         * @param flinkConfiguration The flink configuration
-        * @return True if the properties were loaded, false otherwise
+        * @return Yarn application id or null if none could be retrieved
         */
-       private boolean resumeFromYarnProperties(CommandLine cmdLine, 
Configuration flinkConfiguration) {
+       private String loadYarnPropertiesFile(CommandLine cmdLine, 
Configuration flinkConfiguration) {
 
                String jobManagerOption = 
cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
                if (jobManagerOption != null) {
                        // don't resume from properties file if a JobManager 
has been specified
-                       return false;
+                       return null;
                }
 
                for (Option option : cmdLine.getOptions()) {
                        if (ALL_OPTIONS.hasOption(option.getOpt())) {
                                if (!option.getOpt().equals(DETACHED.getOpt())) 
{
                                        // don't resume from properties file if 
yarn options have been specified
-                                       return false;
+                                       return null;
                                }
                        }
                }
@@ -177,7 +176,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                // load the YARN properties
                File propertiesFile = 
getYarnPropertiesLocation(flinkConfiguration);
                if (!propertiesFile.exists()) {
-                       return false;
+                       return null;
                }
 
                logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
@@ -192,6 +191,24 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        throw new RuntimeException("Cannot read the YARN 
properties file", e);
                }
 
+               // get the Yarn application id from the properties file
+               String applicationID = 
yarnProperties.getProperty(YARN_APPLICATION_ID_KEY);
+               if (applicationID == null) {
+                       throw new IllegalConfigurationException("Yarn 
properties file found but doesn't contain a " +
+                               "Yarn applicaiton id. Please delete the file at 
" + propertiesFile.getAbsolutePath());
+               }
+
+               try {
+                       // try converting id to ApplicationId
+                       ConverterUtils.toApplicationId(applicationID);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("YARN properties contains an 
invalid entry for " +
+                               "application id: " + applicationID, e);
+               }
+
+               logAndSysout("Using Yarn application id from YARN properties " 
+ applicationID);
+
                // configure the default parallelism from YARN
                String propParallelism = 
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
                if (propParallelism != null) { // maybe the property is not set
@@ -207,22 +224,6 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        }
                }
 
-               // get the JobManager address from the YARN properties
-               String address = 
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
-               InetSocketAddress jobManagerAddress;
-               if (address != null) {
-                       try {
-                               jobManagerAddress = 
ClientUtils.parseHostPortAddress(address);
-                               // store address in config from where it is 
retrieved by the retrieval service
-                               
CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress);
-                       }
-                       catch (Exception e) {
-                               throw new RuntimeException("YARN properties 
contain an invalid entry for JobManager address.", e);
-                       }
-
-                       logAndSysout("Using JobManager address from YARN 
properties " + jobManagerAddress);
-               }
-
                // handle the YARN client's dynamic properties
                String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
                Map<String, String> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
@@ -230,7 +231,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        flinkConfiguration.setString(dynamicProperty.getKey(), 
dynamicProperty.getValue());
                }
 
-               return true;
+               return applicationID;
        }
 
        public AbstractYarnClusterDescriptor createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
@@ -449,7 +450,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                String jobManagerOption = 
commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
                boolean yarnJobManager = ID.equals(jobManagerOption);
                boolean yarnAppId = 
commandLine.hasOption(APPLICATION_ID.getOpt());
-               return yarnJobManager || yarnAppId || 
resumeFromYarnProperties(commandLine, configuration);
+               return yarnJobManager || yarnAppId || 
loadYarnPropertiesFile(commandLine, configuration) != null;
        }
 
        @Override
@@ -481,10 +482,13 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        yarnDescriptor.setFlinkConfiguration(config);
                        return yarnDescriptor.retrieve(applicationID);
                // then try to load from yarn properties
-               } else if (resumeFromYarnProperties(cmdLine, config)) {
-                       AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
-                       yarnDescriptor.setFlinkConfiguration(config);
-                       return yarnDescriptor.retrieveFromConfig(config);
+               } else {
+                       String applicationId = loadYarnPropertiesFile(cmdLine, 
config);
+                       if (applicationId != null) {
+                               AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
+                               yarnDescriptor.setFlinkConfiguration(config);
+                               return yarnDescriptor.retrieve(applicationId);
+                       }
                }
 
                throw new UnsupportedOperationException("Could not resume a 
Yarn cluster.");
@@ -581,7 +585,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        File yarnPropertiesFile = 
getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration());
 
                        Properties yarnProps = new Properties();
-                       yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, 
jobManagerAddress);
+                       yarnProps.setProperty(YARN_APPLICATION_ID_KEY, 
yarnCluster.getApplicationId().toString());
                        if (yarnDescriptor.getTaskManagerSlots() != -1) {
                                String parallelism =
                                                
Integer.toString(yarnDescriptor.getTaskManagerSlots() * 
yarnDescriptor.getTaskManagerCount());

Reply via email to