Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 88ee4ad16 -> 2bd6a8db5
Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA mode as it will lookup all the necessary configuration to return the address namely yarn.resourcemanager.address followed by yarn.resourcemanager.hostname Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45e891c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45e891c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45e891c7 Branch: refs/heads/devel-3 Commit: 45e891c7cd7f83a34a4eefc0de10306ba139d2da Parents: bf72215 Author: Pramod Immaneni <[email protected]> Authored: Fri Sep 25 01:39:48 2015 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Sun Oct 4 09:54:50 2015 -0700 ---------------------------------------------------------------------- .../stram/client/StramClientUtils.java | 28 +++++++++++++------- .../com/datatorrent/stram/util/ConfigUtils.java | 1 + 2 files changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index d596a73..45d2feb 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -22,10 +22,6 @@ import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; - import org.mozilla.javascript.Scriptable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -58,6 +53,10 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.DTLoggerFactory; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + import com.datatorrent.api.StreamingApplication; import com.datatorrent.stram.StramClient; @@ -246,10 +245,7 @@ public class StramClientUtils for (String rmId : ConfigUtils.getRMHAIds(conf)) { LOG.info("Yarn Resource Manager id: {}", rmId); // Set RM_ID to get the corresponding RM_ADDRESS - services.add(SecurityUtil.buildTokenService(NetUtils.createSocketAddr( - conf.get(RM_HOSTNAME_PREFIX + rmId), - YarnConfiguration.DEFAULT_RM_PORT, - RM_HOSTNAME_PREFIX + rmId)).toString()); + services.add(SecurityUtil.buildTokenService(getRMHAAddress(rmId)).toString()); } Text rmTokenService = new Text(Joiner.on(',').join(services)); @@ -284,6 +280,20 @@ public class StramClientUtils credentials.addToken(token.getService(), token); } + public InetSocketAddress getRMHAAddress(String rmId) + { + YarnConfiguration yarnConf; + if (conf instanceof YarnConfiguration) { + yarnConf = (YarnConfiguration)conf; + } else { + yarnConf = new YarnConfiguration(conf); + } + yarnConf.set(ConfigUtils.RM_HA_ID, rmId); + InetSocketAddress socketAddr = yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); + yarnConf.unset(ConfigUtils.RM_HA_ID); + return socketAddr; + } + } private static final Logger LOG = LoggerFactory.getLogger(StramClientUtils.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java index 481815f..68fe27b 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java @@ -39,6 +39,7 @@ public class ConfigUtils private static final String RM_HA_PREFIX = YarnConfiguration.RM_PREFIX + "ha."; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; + public static final String RM_HA_ID = RM_HA_PREFIX + "id"; public static final boolean DEFAULT_RM_HA_ENABLED = false; private static String yarnLogDir;
