Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode as it will lookup all the necessary configuration to return the address namely yarn.resourcemanager.address followed by yarn.resourcemanager.hostname
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45e891c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45e891c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45e891c7 Branch: refs/heads/master Commit: 45e891c7cd7f83a34a4eefc0de10306ba139d2da Parents: bf72215 Author: Pramod Immaneni <[email protected]> Authored: Fri Sep 25 01:39:48 2015 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Sun Oct 4 09:54:50 2015 -0700 ---------------------------------------------------------------------- .../stram/client/StramClientUtils.java | 28 +++++++++++++------- .../com/datatorrent/stram/util/ConfigUtils.java | 1 + 2 files changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index d596a73..45d2feb 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -22,10 +22,6 @@ import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; - import org.mozilla.javascript.Scriptable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -58,6 +53,10 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.DTLoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + import com.datatorrent.api.StreamingApplication; import com.datatorrent.stram.StramClient; @@ -246,10 +245,7 @@ public class StramClientUtils for (String rmId : ConfigUtils.getRMHAIds(conf)) { LOG.info("Yarn Resource Manager id: {}", rmId); // Set RM_ID to get the corresponding RM_ADDRESS - services.add(SecurityUtil.buildTokenService(NetUtils.createSocketAddr( - conf.get(RM_HOSTNAME_PREFIX + rmId), - YarnConfiguration.DEFAULT_RM_PORT, - RM_HOSTNAME_PREFIX + rmId)).toString()); + services.add(SecurityUtil.buildTokenService(getRMHAAddress(rmId)).toString()); } Text rmTokenService = new Text(Joiner.on(',').join(services)); @@ -284,6 +280,20 @@ public class StramClientUtils credentials.addToken(token.getService(), token); } + public InetSocketAddress getRMHAAddress(String rmId) + { + YarnConfiguration yarnConf; + if (conf instanceof YarnConfiguration) { + yarnConf = (YarnConfiguration)conf; + } else { + yarnConf = new YarnConfiguration(conf); + } + yarnConf.set(ConfigUtils.RM_HA_ID, rmId); + InetSocketAddress socketAddr = yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); + yarnConf.unset(ConfigUtils.RM_HA_ID); + return socketAddr; + } + } private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java index 481815f..68fe27b 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java @@ -39,6 +39,7 @@ public class ConfigUtils private static final String RM_HA_PREFIX = YarnConfiguration.RM_PREFIX + "ha."; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; + public static final String RM_HA_ID = RM_HA_PREFIX + "id"; public static final boolean DEFAULT_RM_HA_ENABLED = false; private static String yarnLogDir;
