Using YarnConfiguration getSocketAddr to get the individual rm addresses in HA 
mode as it will lookup all the necessary configuration to return the address 
namely yarn.resourcemanager.address followed by yarn.resourcemanager.hostname


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45e891c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45e891c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45e891c7

Branch: refs/heads/feature-module
Commit: 45e891c7cd7f83a34a4eefc0de10306ba139d2da
Parents: bf72215
Author: Pramod Immaneni <[email protected]>
Authored: Fri Sep 25 01:39:48 2015 -0700
Committer: Pramod Immaneni <[email protected]>
Committed: Sun Oct 4 09:54:50 2015 -0700

----------------------------------------------------------------------
 .../stram/client/StramClientUtils.java          | 28 +++++++++++++-------
 .../com/datatorrent/stram/util/ConfigUtils.java |  1 +
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java 
b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index d596a73..45d2feb 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -22,10 +22,6 @@ import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-
 import org.mozilla.javascript.Scriptable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +53,10 @@ import 
org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.DTLoggerFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.StreamingApplication;
 
 import com.datatorrent.stram.StramClient;
@@ -246,10 +245,7 @@ public class StramClientUtils
       for (String rmId : ConfigUtils.getRMHAIds(conf)) {
         LOG.info("Yarn Resource Manager id: {}", rmId);
         // Set RM_ID to get the corresponding RM_ADDRESS
-        services.add(SecurityUtil.buildTokenService(NetUtils.createSocketAddr(
-                conf.get(RM_HOSTNAME_PREFIX + rmId),
-                YarnConfiguration.DEFAULT_RM_PORT,
-                RM_HOSTNAME_PREFIX + rmId)).toString());
+        
services.add(SecurityUtil.buildTokenService(getRMHAAddress(rmId)).toString());
       }
       Text rmTokenService = new Text(Joiner.on(',').join(services));
 
@@ -284,6 +280,20 @@ public class StramClientUtils
       credentials.addToken(token.getService(), token);
     }
 
+    public InetSocketAddress getRMHAAddress(String rmId)
+    {
+      YarnConfiguration yarnConf;
+      if (conf instanceof YarnConfiguration) {
+        yarnConf = (YarnConfiguration)conf;
+      } else {
+        yarnConf = new YarnConfiguration(conf);
+      }
+      yarnConf.set(ConfigUtils.RM_HA_ID, rmId);
+      InetSocketAddress socketAddr = 
yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, 
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
+      yarnConf.unset(ConfigUtils.RM_HA_ID);
+      return socketAddr;
+    }
+
   }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StramClientUtils.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45e891c7/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java 
b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
index 481815f..68fe27b 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/ConfigUtils.java
@@ -39,6 +39,7 @@ public class ConfigUtils
   private static final String RM_HA_PREFIX = YarnConfiguration.RM_PREFIX + 
"ha.";
   public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
   public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
+  public static final String RM_HA_ID = RM_HA_PREFIX + "id";
   public static final boolean DEFAULT_RM_HA_ENABLED = false;
 
   private static String yarnLogDir;

Reply via email to