APEX-149 #resolve Fixed the property name used to lookup RM webapp address in non-HA mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0a89c83c Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0a89c83c Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0a89c83c Branch: refs/heads/feature-module Commit: 0a89c83c48c207cd282f264a8bf515621768eceb Parents: 0a85586 Author: Pramod Immaneni <[email protected]> Authored: Sat Sep 26 10:58:59 2015 -0700 Committer: Thomas Weise <[email protected]> Committed: Sat Sep 26 11:02:17 2015 -0700 ---------------------------------------------------------------------- .../stram/security/StramWSFilter.java | 47 +++++++++++++++----- .../security/StramWSFilterInitializer.java | 19 ++++++-- 2 files changed, 50 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0a89c83c/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java index 8be7fed..061bdc7 100644 --- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java +++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilter.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import com.datatorrent.stram.webapp.WebServices; @@ -102,6 +103,7 @@ public class StramWSFilter implements Filter proxyAddresses = new HashSet<String>(); for (String proxyHost : proxyHosts) { try { + logger.debug("resolving proxy hostname {}", proxyHost); for (InetAddress add : InetAddress.getAllByName(proxyHost)) { logger.debug("proxy address is: {}", add.getHostAddress()); proxyAddresses.add(add.getHostAddress()); @@ -133,9 +135,8 @@ public class StramWSFilter implements Filter HttpServletRequest httpReq = (HttpServletRequest)req; HttpServletResponse httpResp = (HttpServletResponse)resp; - logger.debug("Remote address for request is: {}", httpReq.getRemoteAddr()); + String remoteAddr = httpReq.getRemoteAddr(); String requestURI = httpReq.getRequestURI(); - logger.debug("Request path {}", requestURI); boolean authenticate = true; String user = null; if(getProxyAddresses().contains(httpReq.getRemoteAddr())) { @@ -149,9 +150,11 @@ public class StramWSFilter implements Filter } if (requestURI.equals(WebServices.PATH) && (user != null)) { String token = createClientToken(user, httpReq.getLocalAddr()); - logger.debug("Create token {}", token); + logger.debug("{}: creating token {}", remoteAddr, token); Cookie cookie = new Cookie(CLIENT_COOKIE, token); httpResp.addCookie(cookie); + } else { + logger.info("{}: proxy access to URI {} by user {}, no cookie created", remoteAddr, requestURI, user); } authenticate = false; } @@ -167,19 +170,24 @@ public class StramWSFilter implements Filter } boolean valid = false; if (cookie != null) { - logger.debug("Verifying token {}", cookie.getValue()); - user = verifyClientToken(cookie.getValue()); - valid = true; - logger.debug("Token valid"); + user = verifyClientToken(cookie.getValue(), remoteAddr); + if (user != null) { + valid = true; + } else { + logger.debug("{}: invalid cookie {}", remoteAddr, cookie.getValue()); + } + } else { + logger.debug("{}: cookie not found {}", remoteAddr, CLIENT_COOKIE); } if (!valid) { + logger.debug("{}: auth failure", remoteAddr); httpResp.sendError(HttpServletResponse.SC_UNAUTHORIZED); return; } } if(user == null) { - logger.debug("Could not find {} cookie, so user will not be set", WEBAPP_PROXY_USER); + logger.debug("{}: could not find user, so user principal will not be set", remoteAddr); chain.doFilter(req, resp); } else { final StramWSPrincipal principal = new StramWSPrincipal(user); @@ -199,16 +207,31 @@ public class StramWSFilter implements Filter return token.encodeToUrlString(); } - private String verifyClientToken(String tokenstr) throws IOException + private String verifyClientToken(String tokenstr, String cid) throws IOException { Token<StramDelegationTokenIdentifier> token = new Token<StramDelegationTokenIdentifier>(); - token.decodeFromUrlString(tokenstr); + try { + token.decodeFromUrlString(tokenstr); + } catch (IOException e) { + logger.debug("{}: error decoding token: {}", cid, e.getMessage()); + return null; + } byte[] identifier = token.getIdentifier(); byte[] password = token.getPassword(); StramDelegationTokenIdentifier tokenIdentifier = new StramDelegationTokenIdentifier(); DataInputStream input = new DataInputStream(new ByteArrayInputStream(identifier)); - tokenIdentifier.readFields(input); - tokenManager.verifyToken(tokenIdentifier, password); + try { + tokenIdentifier.readFields(input); + } catch (IOException e) { + logger.debug("{}: error decoding identifier: {}", cid, e.getMessage()); + return null; + } + try { + tokenManager.verifyToken(tokenIdentifier, password); + } catch (SecretManager.InvalidToken e) { + logger.debug("{}: invalid token {}: {}", cid, tokenIdentifier, e.getMessage()); + return null; + } return tokenIdentifier.getOwner().toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0a89c83c/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java index f4f8d22..a2b2821 100644 --- a/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java +++ b/engine/src/main/java/com/datatorrent/stram/security/StramWSFilterInitializer.java @@ -33,6 +33,9 @@ import org.apache.hadoop.http.FilterInitializer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datatorrent.stram.util.ConfigUtils; /** @@ -43,12 +46,15 @@ import com.datatorrent.stram.util.ConfigUtils; */ public class StramWSFilterInitializer extends FilterInitializer { + private static final Logger logger = LoggerFactory.getLogger(StramWSFilterInitializer.class); + private static final String FILTER_NAME = "AM_PROXY_FILTER"; private static final String FILTER_CLASS = StramWSFilter.class.getCanonicalName(); @Override public void initFilter(FilterContainer container, Configuration conf) { + logger.debug("Conf {}", conf); Map<String, String> params = new HashMap<String, String>(); Collection<String> proxies = new ArrayList<String>(); if (ConfigUtils.isRMHAEnabled(conf)) { @@ -80,6 +86,8 @@ public class StramWSFilterInitializer extends FilterInitializer public String getProxyHostAndPort(Configuration conf) { String addr = conf.get(YarnConfiguration.PROXY_ADDRESS); + logger.info("proxy address setting {}", addr); + logger.debug("proxy setting sources {}", conf.getPropertySources(YarnConfiguration.PROXY_ADDRESS)); if (addr == null || addr.isEmpty()) { addr = getResolvedRMWebAppURLWithoutScheme(conf, null); } @@ -96,27 +104,29 @@ public class StramWSFilterInitializer extends FilterInitializer boolean sslEnabled = conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_KEY, CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT); - return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : null); + return getResolvedRMWebAppURLWithoutScheme(conf, sslEnabled, (rmId != null) ? "." + rmId : ""); } /* From org.apache.hadoop.yarn.webapp.util.WebAppUtils Modified for HA support */ - public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmId) + public String getResolvedRMWebAppURLWithoutScheme(Configuration conf, boolean sslEnabled, String rmPrpKey) { InetSocketAddress address = null; if (sslEnabled) { address = - conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmId, + conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + rmPrpKey, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT); } else { address = - conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmId, + conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_ADDRESS + rmPrpKey, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_PORT); } + logger.info("rm webapp address setting {}", address); + logger.debug("rm setting sources {}", conf.getPropertySources(YarnConfiguration.RM_WEBAPP_ADDRESS)); address = NetUtils.getConnectAddress(address); StringBuffer sb = new StringBuffer(); InetAddress resolved = address.getAddress(); @@ -133,6 +143,7 @@ public class StramWSFilterInitializer extends FilterInitializer sb.append(address.getHostName()); } sb.append(":").append(address.getPort()); + logger.info("rm webapp resolved hostname {}", sb.toString()); return sb.toString(); }
