Repository: hive Updated Branches: refs/heads/master 9975131cc -> 8fea11769
http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java index 0cafeff..ed14998 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.util.List; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; /** @@ -30,7 +29,6 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecret * Internal, store specific errors are translated into {@link TokenStoreException}. */ public interface DelegationTokenStore extends Configurable, Closeable { - /** * Exception for internal token store errors that typically cannot be handled by the caller. */ @@ -111,8 +109,8 @@ public interface DelegationTokenStore extends Configurable, Closeable { /** * @param hmsHandler ObjectStore used by DBTokenStore - * @param smode Indicate whether this is a metastore or hiveserver2 token store + * @param serverMode indicate if this tokenstore is for Metastore and HiveServer2 */ - void init(Object hmsHandler, ServerMode smode); + void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode serverMode); } http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java index c484cd3..c29dc79 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,7 +129,7 @@ public class MemoryTokenStore implements DelegationTokenStore { } @Override - public void init(Object hmsHandler, ServerMode smode) throws TokenStoreException { + public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode smode) throws TokenStoreException { // no-op } } http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java index 2b0110f..3cfdd8a 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; @@ -33,8 +34,20 @@ import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.util.ReflectionUtils; public class MetastoreDelegationTokenManager { - + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR = + "hive.cluster.delegation.token.store.zookeeper.connectString"; protected DelegationTokenSecretManager secretManager; + // Alternate connect string specification configuration + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE = + "hive.zookeeper.quorum"; + + public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS = + "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE = + "hive.cluster.delegation.token.store.zookeeper.znode"; + public static final String DELEGATION_TOKEN_STORE_ZK_ACL = + "hive.cluster.delegation.token.store.zookeeper.acl"; + public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = "/hivedelegation"; public MetastoreDelegationTokenManager() { } @@ -43,6 +56,10 @@ public class MetastoreDelegationTokenManager { return secretManager; } + public void startDelegationTokenSecretManager(Configuration conf, Object hms) throws IOException { + startDelegationTokenSecretManager(conf, hms, HadoopThriftAuthBridge.Server.ServerMode.METASTORE); + } + public void startDelegationTokenSecretManager(Configuration conf, Object hms, HadoopThriftAuthBridge.Server.ServerMode smode) throws IOException { long secretKeyInterval = MetastoreConf.getTimeVar(conf, @@ -121,17 +138,7 @@ public class MetastoreDelegationTokenManager { } private DelegationTokenStore getTokenStore(Configuration conf) throws IOException { - String tokenStoreClassName = - MetastoreConf.getVar(conf, MetastoreConf.ConfVars.DELEGATION_TOKEN_STORE_CLS, ""); - // The second half of this if is to catch cases where users are passing in a HiveConf for - // configuration. It will have set the default value of - // "hive.cluster.delegation.token.store .class" to - // "org.apache.hadoop.hive.thrift.MemoryTokenStore" as part of its construction. But this is - // the hive-shims version of the memory store. We want to convert this to our default value. - if (StringUtils.isBlank(tokenStoreClassName) || - "org.apache.hadoop.hive.thrift.MemoryTokenStore".equals(tokenStoreClassName)) { - return new MemoryTokenStore(); - } + String tokenStoreClassName = SecurityUtils.getTokenStoreClassName(conf); try { Class<? extends DelegationTokenStore> storeClass = Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class); http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java index 4abcec7..3f5bd53 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java @@ -33,6 +33,7 @@ import java.util.Map; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.io.Writable; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport; http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java new file mode 100644 index 0000000..42f2f62 --- /dev/null +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java @@ -0,0 +1,474 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.security; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; +import org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper token store implementation. + */ +public class ZooKeeperTokenStore implements DelegationTokenStore { + + private static final Logger LOGGER = + LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName()); + + protected static final String ZK_SEQ_FORMAT = "%010d"; + private static final String NODE_KEYS = "/keys"; + private static final String NODE_TOKENS = "/tokens"; + + private String rootNode = ""; + private volatile CuratorFramework zkSession; + private String zkConnectString; + private int connectTimeoutMillis; + private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, Ids.AUTH_IDS)); + + /** + * ACLProvider permissions will be used in case parent dirs need to be created + */ + private final ACLProvider aclDefaultProvider = new ACLProvider() { + + @Override + public List<ACL> getDefaultAcl() { + return newNodeAcl; + } + + @Override + public List<ACL> getAclForPath(String path) { + return getDefaultAcl(); + } + }; + + + private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation token storage is enabled" + + "(hive.cluster.delegation.token.store.class=" + ZooKeeperTokenStore.class.getName() + ")"; + + private Configuration conf; + + private HadoopThriftAuthBridge.Server.ServerMode serverMode; + + /** + * Default constructor for dynamic instantiation w/ Configurable + * (ReflectionUtils does not support Configuration constructor injection). + */ + protected ZooKeeperTokenStore() { + } + + private CuratorFramework getSession() { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + synchronized (this) { + if (zkSession == null || zkSession.getState() == CuratorFrameworkState.STOPPED) { + zkSession = + CuratorFrameworkFactory.builder().connectString(zkConnectString) + .connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + zkSession.start(); + } + } + } + return zkSession; + } + + private void setupJAASConfig(Configuration conf) throws IOException { + if (!UserGroupInformation.getLoginUser().isFromKeytab()) { + // The process has not logged in using keytab + // this should be a test mode, can't use keytab to authenticate + // with zookeeper. + LOGGER.warn("Login is not from keytab"); + return; + } + + String principal; + String keytab; + switch (serverMode) { + case METASTORE: + principal = getNonEmptyConfVar(conf, "hive.metastore.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file"); + break; + case HIVESERVER2: + principal = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.principal"); + keytab = getNonEmptyConfVar(conf, "hive.server2.authentication.kerberos.keytab"); + break; + default: + throw new AssertionError("Unexpected server mode " + serverMode); + } + SecurityUtils.setZookeeperClientKerberosJaasConfig(principal, keytab); + } + + private String getNonEmptyConfVar(Configuration conf, String param) throws IOException { + String val = conf.get(param); + if (val == null || val.trim().isEmpty()) { + throw new IOException("Configuration parameter " + param + " should be set, " + + WHEN_ZK_DSTORE_MSG); + } + return val; + } + + /** + * Create a path if it does not already exist ("mkdir -p") + * @param path string with '/' separator + * @param acl list of ACL entries + * @throws TokenStoreException + */ + public void ensurePath(String path, List<ACL> acl) + throws TokenStoreException { + try { + CuratorFramework zk = getSession(); + String node = zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .withACL(acl).forPath(path); + LOGGER.info("Created path: {} ", node); + } catch (KeeperException.NodeExistsException e) { + // node already exists + } catch (Exception e) { + throw new TokenStoreException("Error creating path " + path, e); + } + } + + /** + * Parse ACL permission string, from ZooKeeperMain private method + * @param permString + * @return + */ + public static int getPermFromString(String permString) { + int perm = 0; + for (int i = 0; i < permString.length(); i++) { + switch (permString.charAt(i)) { + case 'r': + perm |= ZooDefs.Perms.READ; + break; + case 'w': + perm |= ZooDefs.Perms.WRITE; + break; + case 'c': + perm |= ZooDefs.Perms.CREATE; + break; + case 'd': + perm |= ZooDefs.Perms.DELETE; + break; + case 'a': + perm |= ZooDefs.Perms.ADMIN; + break; + default: + LOGGER.error("Unknown perm type: " + permString.charAt(i)); + } + } + return perm; + } + + /** + * Parse comma separated list of ACL entries to secure generated nodes, e.g. + * <code>sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa</code> + * @param aclString + * @return ACL list + */ + public static List<ACL> parseACLs(String aclString) { + String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ","); + List<ACL> acl = new ArrayList<ACL>(aclComps.length); + for (String a : aclComps) { + if (StringUtils.isBlank(a)) { + continue; + } + a = a.trim(); + // from ZooKeeperMain private method + int firstColon = a.indexOf(':'); + int lastColon = a.lastIndexOf(':'); + if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { + LOGGER.error(a + " does not have the form scheme:id:perm"); + continue; + } + ACL newAcl = new ACL(); + newAcl.setId(new Id(a.substring(0, firstColon), a.substring( + firstColon + 1, lastColon))); + newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); + acl.add(newAcl); + } + return acl; + } + + private void initClientAndPaths() { + if (this.zkSession != null) { + this.zkSession.close(); + } + try { + ensurePath(rootNode + NODE_KEYS, newNodeAcl); + ensurePath(rootNode + NODE_TOKENS, newNodeAcl); + } catch (TokenStoreException e) { + throw e; + } + } + + @Override + public void setConf(Configuration conf) { + if (conf == null) { + throw new IllegalArgumentException("conf is null"); + } + this.conf = conf; + } + + @Override + public Configuration getConf() { + return null; // not required + } + + private Map<Integer, byte[]> getAllKeys() throws KeeperException, InterruptedException { + + String masterKeyNode = rootNode + NODE_KEYS; + + // get children of key node + List<String> nodes = zkGetChildren(masterKeyNode); + + // read each child node, add to results + Map<Integer, byte[]> result = new HashMap<Integer, byte[]>(); + for (String node : nodes) { + String nodePath = masterKeyNode + "/" + node; + byte[] data = zkGetData(nodePath); + if (data != null) { + result.put(getSeq(node), data); + } + } + return result; + } + + private List<String> zkGetChildren(String path) { + CuratorFramework zk = getSession(); + try { + return zk.getChildren().forPath(path); + } catch (Exception e) { + throw new TokenStoreException("Error getting children for " + path, e); + } + } + + private byte[] zkGetData(String nodePath) { + CuratorFramework zk = getSession(); + try { + return zk.getData().forPath(nodePath); + } catch (KeeperException.NoNodeException ex) { + return null; + } catch (Exception e) { + throw new TokenStoreException("Error reading " + nodePath, e); + } + } + + private int getSeq(String path) { + String[] pathComps = path.split("/"); + return Integer.parseInt(pathComps[pathComps.length-1]); + } + + @Override + public int addMasterKey(String s) { + String keysPath = rootNode + NODE_KEYS + "/"; + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl) + .forPath(keysPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + keysPath, e); + } + LOGGER.info("Added key {}", newNode); + return getSeq(newNode); + } + + @Override + public void updateMasterKey(int keySeq, String s) { + CuratorFramework zk = getSession(); + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + try { + zk.setData().forPath(keyPath, s.getBytes()); + } catch (Exception e) { + throw new TokenStoreException("Error setting data in " + keyPath, e); + } + } + + @Override + public boolean removeMasterKey(int keySeq) { + String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, keySeq); + zkDelete(keyPath); + return true; + } + + private void zkDelete(String path) { + CuratorFramework zk = getSession(); + try { + zk.delete().forPath(path); + } catch (KeeperException.NoNodeException ex) { + // already deleted + } catch (Exception e) { + throw new TokenStoreException("Error deleting " + path, e); + } + } + + @Override + public String[] getMasterKeys() { + try { + Map<Integer, byte[]> allKeys = getAllKeys(); + String[] result = new String[allKeys.size()]; + int resultIdx = 0; + for (byte[] keyBytes : allKeys.values()) { + result[resultIdx++] = new String(keyBytes); + } + return result; + } catch (KeeperException ex) { + throw new TokenStoreException(ex); + } catch (InterruptedException ex) { + throw new TokenStoreException(ex); + } + } + + + private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) { + try { + return rootNode + NODE_TOKENS + "/" + + TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier); + } catch (IOException ex) { + throw new TokenStoreException("Failed to encode token identifier", ex); + } + } + + @Override + public boolean addToken(DelegationTokenIdentifier tokenIdentifier, + DelegationTokenInformation token) { + byte[] tokenBytes = MetastoreDelegationTokenSupport.encodeDelegationTokenInformation(token); + String tokenPath = getTokenPath(tokenIdentifier); + CuratorFramework zk = getSession(); + String newNode; + try { + newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl) + .forPath(tokenPath, tokenBytes); + } catch (Exception e) { + throw new TokenStoreException("Error creating new node with path " + tokenPath, e); + } + + LOGGER.info("Added token: {}", newNode); + return true; + } + + @Override + public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) { + String tokenPath = getTokenPath(tokenIdentifier); + zkDelete(tokenPath); + return true; + } + + @Override + public DelegationTokenInformation getToken(DelegationTokenIdentifier tokenIdentifier) { + byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier)); + if(tokenBytes == null) { + // The token is already removed. + return null; + } + try { + return MetastoreDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes); + } catch (Exception ex) { + throw new TokenStoreException("Failed to decode token", ex); + } + } + + @Override + public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() { + String containerNode = rootNode + NODE_TOKENS; + final List<String> nodes = zkGetChildren(containerNode); + List<DelegationTokenIdentifier> result = new java.util.ArrayList<DelegationTokenIdentifier>( + nodes.size()); + for (String node : nodes) { + DelegationTokenIdentifier id = new DelegationTokenIdentifier(); + try { + TokenStoreDelegationTokenSecretManager.decodeWritable(id, node); + result.add(id); + } catch (Exception e) { + LOGGER.warn("Failed to decode token '{}'", node); + } + } + return result; + } + + @Override + public void close() throws IOException { + if (this.zkSession != null) { + this.zkSession.close(); + } + } + + @Override + public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode sMode) { + this.serverMode = sMode; + zkConnectString = + conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + // try alternate config param + zkConnectString = + conf.get( + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE, + null); + if (zkConnectString == null || zkConnectString.trim().isEmpty()) { + throw new IllegalArgumentException("Zookeeper connect string has to be specifed through " + + "either " + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR + + " or " + + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE + + WHEN_ZK_DSTORE_MSG); + } + } + connectTimeoutMillis = + conf.getInt( + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS, + CuratorFrameworkFactory.builder().getConnectionTimeoutMs()); + String aclStr = conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null); + if (StringUtils.isNotBlank(aclStr)) { + this.newNodeAcl = parseACLs(aclStr); + } + rootNode = + conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE, + MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + serverMode; + + try { + // Install the JAAS Configuration for the runtime + setupJAASConfig(conf); + } catch (IOException e) { + throw new TokenStoreException("Error setting up JAAS configuration for zookeeper client " + + e.getMessage(), e); + } + initClientAndPaths(); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java index 9f0ca82..b05c995 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java @@ -17,10 +17,29 @@ */ package org.apache.hadoop.hive.metastore.utils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.security.DBTokenStore; +import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier; +import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector; +import org.apache.hadoop.hive.metastore.security.MemoryTokenStore; +import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenSelector; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.LoginException; +import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag; + import java.io.IOException; +import java.util.HashMap; +import java.util.Map; public class SecurityUtils { public static UserGroupInformation getUGI() throws LoginException, IOException { @@ -36,5 +55,158 @@ public class SecurityUtils { } return UserGroupInformation.getCurrentUser(); } + /** + * Dynamically sets up the JAAS configuration that uses kerberos + * @param principal + * @param keyTabFile + * @throws IOException + */ + public static void setZookeeperClientKerberosJaasConfig(String principal, String keyTabFile) throws IOException { + // ZooKeeper property name to pick the correct JAAS conf section + final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient"; + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME); + + principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0"); + JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile); + + // Install the Configuration in the runtime. + javax.security.auth.login.Configuration.setConfiguration(jaasConf); + } + + /** + * A JAAS configuration for ZooKeeper clients intended to use for SASL + * Kerberos. + */ + private static class JaasConfiguration extends javax.security.auth.login.Configuration { + // Current installed Configuration + private static final boolean IBM_JAVA = System.getProperty("java.vendor") + .contains("IBM"); + private final javax.security.auth.login.Configuration baseConfig = javax.security.auth.login.Configuration + .getConfiguration(); + private final String loginContextName; + private final String principal; + private final String keyTabFile; + + public JaasConfiguration(String hiveLoginContextName, String principal, String keyTabFile) { + this.loginContextName = hiveLoginContextName; + this.principal = principal; + this.keyTabFile = keyTabFile; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String appName) { + if (loginContextName.equals(appName)) { + Map<String, String> krbOptions = new HashMap<String, String>(); + if (IBM_JAVA) { + krbOptions.put("credsType", "both"); + krbOptions.put("useKeytab", keyTabFile); + } else { + krbOptions.put("doNotPrompt", "true"); + krbOptions.put("storeKey", "true"); + krbOptions.put("useKeyTab", "true"); + krbOptions.put("keyTab", keyTabFile); + } + krbOptions.put("principal", principal); + krbOptions.put("refreshKrb5Config", "true"); + AppConfigurationEntry hiveZooKeeperClientEntry = new AppConfigurationEntry( + KerberosUtil.getKrb5LoginModuleName(), LoginModuleControlFlag.REQUIRED, krbOptions); + return new AppConfigurationEntry[] { hiveZooKeeperClientEntry }; + } + // Try the base config + if (baseConfig != null) { + return baseConfig.getAppConfigurationEntry(appName); + } + return null; + } + } + + /** + * Get the string form of the token given a token signature. The signature is used as the value of + * the "service" field in the token for lookup. Ref: AbstractDelegationTokenSelector in Hadoop. If + * there exists such a token in the token cache (credential store) of the job, the lookup returns + * that. This is relevant only when running against a "secure" hadoop release The method gets hold + * of the tokens if they are set up by hadoop - this should happen on the map/reduce tasks if the + * client added the tokens into hadoop's credential store in the front end during job submission. + * The method will select the hive delegation token among the set of tokens and return the string + * form of it + * + * @param tokenSignature + * @return the string form of the token found + * @throws IOException + */ + public static String getTokenStrForm(String tokenSignature) throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + TokenSelector<? extends TokenIdentifier> tokenSelector = new DelegationTokenSelector(); + Token<? extends TokenIdentifier> token = tokenSelector.selectToken( + tokenSignature == null ? new Text() : new Text(tokenSignature), ugi.getTokens()); + return token != null ? token.encodeToUrlString() : null; + } + + /** + * Create a delegation token object for the given token string and service. Add the token to given + * UGI + * + * @param ugi + * @param tokenStr + * @param tokenService + * @throws IOException + */ + public static void setTokenStr(UserGroupInformation ugi, String tokenStr, String tokenService) + throws IOException { + Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, tokenService); + ugi.addToken(delegationToken); + } + + /** + * Create a new token using the given string and service + * + * @param tokenStr + * @param tokenService + * @return + * @throws IOException + */ + private static Token<DelegationTokenIdentifier> createToken(String tokenStr, String tokenService) + throws IOException { + Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>(); + delegationToken.decodeFromUrlString(tokenStr); + delegationToken.setService(new Text(tokenService)); + return delegationToken; + } + + private static final String DELEGATION_TOKEN_STORE_CLS = "hive.cluster.delegation.token.store.class"; + + /** + * This method should be used to return the metastore specific tokenstore class name to main + * backwards compatibility + * + * @param conf - HiveConf object + * @return the tokenStoreClass name from the HiveConf. It maps the hive specific tokenstoreclass + * name to metastore module specific class name. For eg: + * hive.cluster.delegation.token.store.class is set to + * org.apache.hadoop.hive.thrift.MemoryTokenStore it returns the equivalent tokenstore + * class defined in the metastore module which is + * org.apache.hadoop.hive.metastore.security.MemoryTokenStore Similarly, + * org.apache.hadoop.hive.thrift.DBTokenStore maps to + * org.apache.hadoop.hive.metastore.security.DBTokenStore and + * org.apache.hadoop.hive.thrift.ZooKeeperTokenStore maps to + * org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore + */ + public static String getTokenStoreClassName(Configuration conf) { + String tokenStoreClass = conf.get(DELEGATION_TOKEN_STORE_CLS, ""); + if (StringUtils.isBlank(tokenStoreClass)) { + // default tokenstore is MemoryTokenStore + return MemoryTokenStore.class.getName(); + } + switch (tokenStoreClass) { + case "org.apache.hadoop.hive.thrift.DBTokenStore": + return DBTokenStore.class.getName(); + case "org.apache.hadoop.hive.thrift.MemoryTokenStore": + return MemoryTokenStore.class.getName(); + case "org.apache.hadoop.hive.thrift.ZooKeeperTokenStore": + return ZooKeeperTokenStore.class.getName(); + default: + return tokenStoreClass; + } + } }