Repository: hive
Updated Branches:
  refs/heads/master 9975131cc -> 8fea11769


http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
index 0cafeff..ed14998 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/DelegationTokenStore.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configurable;
-import 
org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 
 /**
@@ -30,7 +29,6 @@ import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecret
  * Internal, store specific errors are translated into {@link 
TokenStoreException}.
  */
 public interface DelegationTokenStore extends Configurable, Closeable {
-
   /**
    * Exception for internal token store errors that typically cannot be 
handled by the caller.
    */
@@ -111,8 +109,8 @@ public interface DelegationTokenStore extends Configurable, 
Closeable {
 
   /**
    * @param hmsHandler ObjectStore used by DBTokenStore
-   * @param smode Indicate whether this is a metastore or hiveserver2 token 
store
+   * @param serverMode indicate if this tokenstore is for Metastore and 
HiveServer2
    */
-  void init(Object hmsHandler, ServerMode smode);
+  void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode 
serverMode);
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
index c484cd3..c29dc79 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MemoryTokenStore.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server.ServerMode;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,7 +129,7 @@ public class MemoryTokenStore implements 
DelegationTokenStore {
   }
 
   @Override
-  public void init(Object hmsHandler, ServerMode smode) throws 
TokenStoreException {
+  public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode 
smode) throws TokenStoreException {
     // no-op
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
index 2b0110f..3cfdd8a 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/MetastoreDelegationTokenManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -33,8 +34,20 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class MetastoreDelegationTokenManager {
-
+  public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR =
+      "hive.cluster.delegation.token.store.zookeeper.connectString";
   protected DelegationTokenSecretManager secretManager;
+  // Alternate connect string specification configuration
+  public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE =
+      "hive.zookeeper.quorum";
+
+  public static final String DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS =
+      "hive.cluster.delegation.token.store.zookeeper.connectTimeoutMillis";
+  public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE =
+      "hive.cluster.delegation.token.store.zookeeper.znode";
+  public static final String DELEGATION_TOKEN_STORE_ZK_ACL =
+      "hive.cluster.delegation.token.store.zookeeper.acl";
+  public static final String DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT = 
"/hivedelegation";
 
   public MetastoreDelegationTokenManager() {
   }
@@ -43,6 +56,10 @@ public class MetastoreDelegationTokenManager {
     return secretManager;
   }
 
+  public void startDelegationTokenSecretManager(Configuration conf, Object 
hms) throws IOException {
+    startDelegationTokenSecretManager(conf, hms, 
HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
+  }
+
   public void startDelegationTokenSecretManager(Configuration conf, Object 
hms, HadoopThriftAuthBridge.Server.ServerMode smode)
       throws IOException {
     long secretKeyInterval = MetastoreConf.getTimeVar(conf,
@@ -121,17 +138,7 @@ public class MetastoreDelegationTokenManager {
   }
 
   private DelegationTokenStore getTokenStore(Configuration conf) throws 
IOException {
-    String tokenStoreClassName =
-        MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.DELEGATION_TOKEN_STORE_CLS, "");
-    // The second half of this if is to catch cases where users are passing in 
a HiveConf for
-    // configuration.  It will have set the default value of
-    // "hive.cluster.delegation.token.store .class" to
-    // "org.apache.hadoop.hive.thrift.MemoryTokenStore" as part of its 
construction.  But this is
-    // the hive-shims version of the memory store.  We want to convert this to 
our default value.
-    if (StringUtils.isBlank(tokenStoreClassName) ||
-        
"org.apache.hadoop.hive.thrift.MemoryTokenStore".equals(tokenStoreClassName)) {
-      return new MemoryTokenStore();
-    }
+    String tokenStoreClassName = SecurityUtils.getTokenStoreClassName(conf);
     try {
       Class<? extends DelegationTokenStore> storeClass =
           
Class.forName(tokenStoreClassName).asSubclass(DelegationTokenStore.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java
index 4abcec7..3f5bd53 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/TokenStoreDelegationTokenSecretManager.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.Token;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import 
org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport;

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
new file mode 100644
index 0000000..42f2f62
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/security/ZooKeeperTokenStore.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.security;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
+import 
org.apache.hadoop.security.token.delegation.MetastoreDelegationTokenSupport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper token store implementation.
+ */
+public class ZooKeeperTokenStore implements DelegationTokenStore {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(ZooKeeperTokenStore.class.getName());
+
+  protected static final String ZK_SEQ_FORMAT = "%010d";
+  private static final String NODE_KEYS = "/keys";
+  private static final String NODE_TOKENS = "/tokens";
+
+  private String rootNode = "";
+  private volatile CuratorFramework zkSession;
+  private String zkConnectString;
+  private int connectTimeoutMillis;
+  private List<ACL> newNodeAcl = Arrays.asList(new ACL(Perms.ALL, 
Ids.AUTH_IDS));
+
+  /**
+   * ACLProvider permissions will be used in case parent dirs need to be 
created
+   */
+  private final ACLProvider aclDefaultProvider =  new ACLProvider() {
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      return newNodeAcl;
+    }
+
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return getDefaultAcl();
+    }
+  };
+
+
+  private final String WHEN_ZK_DSTORE_MSG = "when zookeeper based delegation 
token storage is enabled"
+      + "(hive.cluster.delegation.token.store.class=" + 
ZooKeeperTokenStore.class.getName() + ")";
+
+  private Configuration conf;
+
+  private HadoopThriftAuthBridge.Server.ServerMode serverMode;
+
+  /**
+   * Default constructor for dynamic instantiation w/ Configurable
+   * (ReflectionUtils does not support Configuration constructor injection).
+   */
+  protected ZooKeeperTokenStore() {
+  }
+
+  private CuratorFramework getSession() {
+    if (zkSession == null || zkSession.getState() == 
CuratorFrameworkState.STOPPED) {
+      synchronized (this) {
+        if (zkSession == null || zkSession.getState() == 
CuratorFrameworkState.STOPPED) {
+          zkSession =
+              CuratorFrameworkFactory.builder().connectString(zkConnectString)
+                  
.connectionTimeoutMs(connectTimeoutMillis).aclProvider(aclDefaultProvider)
+                  .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+          zkSession.start();
+        }
+      }
+    }
+    return zkSession;
+  }
+
+  private void setupJAASConfig(Configuration conf) throws IOException {
+    if (!UserGroupInformation.getLoginUser().isFromKeytab()) {
+      // The process has not logged in using keytab
+      // this should be a test mode, can't use keytab to authenticate
+      // with zookeeper.
+      LOGGER.warn("Login is not from keytab");
+      return;
+    }
+
+    String principal;
+    String keytab;
+    switch (serverMode) {
+    case METASTORE:
+      principal = getNonEmptyConfVar(conf, 
"hive.metastore.kerberos.principal");
+      keytab = getNonEmptyConfVar(conf, "hive.metastore.kerberos.keytab.file");
+      break;
+    case HIVESERVER2:
+      principal = getNonEmptyConfVar(conf, 
"hive.server2.authentication.kerberos.principal");
+      keytab = getNonEmptyConfVar(conf, 
"hive.server2.authentication.kerberos.keytab");
+      break;
+    default:
+      throw new AssertionError("Unexpected server mode " + serverMode);
+    }
+    SecurityUtils.setZookeeperClientKerberosJaasConfig(principal, keytab);
+  }
+
+  private String getNonEmptyConfVar(Configuration conf, String param) throws 
IOException {
+    String val = conf.get(param);
+    if (val == null || val.trim().isEmpty()) {
+      throw new IOException("Configuration parameter " + param + " should be 
set, "
+          + WHEN_ZK_DSTORE_MSG);
+    }
+    return val;
+  }
+
+  /**
+   * Create a path if it does not already exist ("mkdir -p")
+   * @param path string with '/' separator
+   * @param acl list of ACL entries
+   * @throws TokenStoreException
+   */
+  public void ensurePath(String path, List<ACL> acl)
+      throws TokenStoreException {
+    try {
+      CuratorFramework zk = getSession();
+      String node = 
zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+          .withACL(acl).forPath(path);
+      LOGGER.info("Created path: {} ", node);
+    } catch (KeeperException.NodeExistsException e) {
+      // node already exists
+    } catch (Exception e) {
+      throw new TokenStoreException("Error creating path " + path, e);
+    }
+  }
+
+  /**
+   * Parse ACL permission string, from ZooKeeperMain private method
+   * @param permString
+   * @return
+   */
+  public static int getPermFromString(String permString) {
+      int perm = 0;
+      for (int i = 0; i < permString.length(); i++) {
+          switch (permString.charAt(i)) {
+          case 'r':
+              perm |= ZooDefs.Perms.READ;
+              break;
+          case 'w':
+              perm |= ZooDefs.Perms.WRITE;
+              break;
+          case 'c':
+              perm |= ZooDefs.Perms.CREATE;
+              break;
+          case 'd':
+              perm |= ZooDefs.Perms.DELETE;
+              break;
+          case 'a':
+              perm |= ZooDefs.Perms.ADMIN;
+              break;
+          default:
+              LOGGER.error("Unknown perm type: " + permString.charAt(i));
+          }
+      }
+      return perm;
+  }
+
+  /**
+   * Parse comma separated list of ACL entries to secure generated nodes, e.g.
+   * 
<code>sasl:hive/host1@MY.DOMAIN:cdrwa,sasl:hive/host2@MY.DOMAIN:cdrwa</code>
+   * @param aclString
+   * @return ACL list
+   */
+  public static List<ACL> parseACLs(String aclString) {
+    String[] aclComps = StringUtils.splitByWholeSeparator(aclString, ",");
+    List<ACL> acl = new ArrayList<ACL>(aclComps.length);
+    for (String a : aclComps) {
+      if (StringUtils.isBlank(a)) {
+         continue;
+      }
+      a = a.trim();
+      // from ZooKeeperMain private method
+      int firstColon = a.indexOf(':');
+      int lastColon = a.lastIndexOf(':');
+      if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
+         LOGGER.error(a + " does not have the form scheme:id:perm");
+         continue;
+      }
+      ACL newAcl = new ACL();
+      newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
+          firstColon + 1, lastColon)));
+      newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
+      acl.add(newAcl);
+    }
+    return acl;
+  }
+
+  private void initClientAndPaths() {
+    if (this.zkSession != null) {
+      this.zkSession.close();
+    }
+    try {
+      ensurePath(rootNode + NODE_KEYS, newNodeAcl);
+      ensurePath(rootNode + NODE_TOKENS, newNodeAcl);
+    } catch (TokenStoreException e) {
+      throw e;
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf == null) {
+      throw new IllegalArgumentException("conf is null");
+    }
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null; // not required
+  }
+
+  private Map<Integer, byte[]> getAllKeys() throws KeeperException, 
InterruptedException {
+
+    String masterKeyNode = rootNode + NODE_KEYS;
+
+    // get children of key node
+    List<String> nodes = zkGetChildren(masterKeyNode);
+
+    // read each child node, add to results
+    Map<Integer, byte[]> result = new HashMap<Integer, byte[]>();
+    for (String node : nodes) {
+      String nodePath = masterKeyNode + "/" + node;
+      byte[] data = zkGetData(nodePath);
+      if (data != null) {
+        result.put(getSeq(node), data);
+      }
+    }
+    return result;
+  }
+
+  private List<String> zkGetChildren(String path) {
+    CuratorFramework zk = getSession();
+    try {
+      return zk.getChildren().forPath(path);
+    } catch (Exception e) {
+      throw new TokenStoreException("Error getting children for " + path, e);
+    }
+  }
+
+  private byte[] zkGetData(String nodePath) {
+    CuratorFramework zk = getSession();
+    try {
+      return zk.getData().forPath(nodePath);
+    } catch (KeeperException.NoNodeException ex) {
+      return null;
+    } catch (Exception e) {
+      throw new TokenStoreException("Error reading " + nodePath, e);
+    }
+  }
+
+  private int getSeq(String path) {
+    String[] pathComps = path.split("/");
+    return Integer.parseInt(pathComps[pathComps.length-1]);
+  }
+
+  @Override
+  public int addMasterKey(String s) {
+    String keysPath = rootNode + NODE_KEYS + "/";
+    CuratorFramework zk = getSession();
+    String newNode;
+    try {
+      newNode = 
zk.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).withACL(newNodeAcl)
+          .forPath(keysPath, s.getBytes());
+    } catch (Exception e) {
+      throw new TokenStoreException("Error creating new node with path " + 
keysPath, e);
+    }
+    LOGGER.info("Added key {}", newNode);
+    return getSeq(newNode);
+  }
+
+  @Override
+  public void updateMasterKey(int keySeq, String s) {
+    CuratorFramework zk = getSession();
+    String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, 
keySeq);
+    try {
+      zk.setData().forPath(keyPath, s.getBytes());
+    } catch (Exception e) {
+      throw new TokenStoreException("Error setting data in " + keyPath, e);
+    }
+  }
+
+  @Override
+  public boolean removeMasterKey(int keySeq) {
+    String keyPath = rootNode + NODE_KEYS + "/" + String.format(ZK_SEQ_FORMAT, 
keySeq);
+    zkDelete(keyPath);
+    return true;
+  }
+
+  private void zkDelete(String path) {
+    CuratorFramework zk = getSession();
+    try {
+      zk.delete().forPath(path);
+    } catch (KeeperException.NoNodeException ex) {
+      // already deleted
+    } catch (Exception e) {
+      throw new TokenStoreException("Error deleting " + path, e);
+    }
+  }
+
+  @Override
+  public String[] getMasterKeys() {
+    try {
+      Map<Integer, byte[]> allKeys = getAllKeys();
+      String[] result = new String[allKeys.size()];
+      int resultIdx = 0;
+      for (byte[] keyBytes : allKeys.values()) {
+          result[resultIdx++] = new String(keyBytes);
+      }
+      return result;
+    } catch (KeeperException ex) {
+      throw new TokenStoreException(ex);
+    } catch (InterruptedException ex) {
+      throw new TokenStoreException(ex);
+    }
+  }
+
+
+  private String getTokenPath(DelegationTokenIdentifier tokenIdentifier) {
+    try {
+      return rootNode + NODE_TOKENS + "/"
+          + 
TokenStoreDelegationTokenSecretManager.encodeWritable(tokenIdentifier);
+    } catch (IOException ex) {
+      throw new TokenStoreException("Failed to encode token identifier", ex);
+    }
+  }
+
+  @Override
+  public boolean addToken(DelegationTokenIdentifier tokenIdentifier,
+      DelegationTokenInformation token) {
+    byte[] tokenBytes = 
MetastoreDelegationTokenSupport.encodeDelegationTokenInformation(token);
+    String tokenPath = getTokenPath(tokenIdentifier);
+    CuratorFramework zk = getSession();
+    String newNode;
+    try {
+      newNode = zk.create().withMode(CreateMode.PERSISTENT).withACL(newNodeAcl)
+          .forPath(tokenPath, tokenBytes);
+    } catch (Exception e) {
+      throw new TokenStoreException("Error creating new node with path " + 
tokenPath, e);
+    }
+
+    LOGGER.info("Added token: {}", newNode);
+    return true;
+  }
+
+  @Override
+  public boolean removeToken(DelegationTokenIdentifier tokenIdentifier) {
+    String tokenPath = getTokenPath(tokenIdentifier);
+    zkDelete(tokenPath);
+    return true;
+  }
+
+  @Override
+  public DelegationTokenInformation getToken(DelegationTokenIdentifier 
tokenIdentifier) {
+    byte[] tokenBytes = zkGetData(getTokenPath(tokenIdentifier));
+    if(tokenBytes == null) {
+      // The token is already removed.
+      return null;
+    }
+    try {
+      return 
MetastoreDelegationTokenSupport.decodeDelegationTokenInformation(tokenBytes);
+    } catch (Exception ex) {
+      throw new TokenStoreException("Failed to decode token", ex);
+    }
+  }
+
+  @Override
+  public List<DelegationTokenIdentifier> getAllDelegationTokenIdentifiers() {
+    String containerNode = rootNode + NODE_TOKENS;
+    final List<String> nodes = zkGetChildren(containerNode);
+    List<DelegationTokenIdentifier> result = new 
java.util.ArrayList<DelegationTokenIdentifier>(
+        nodes.size());
+    for (String node : nodes) {
+      DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+      try {
+        TokenStoreDelegationTokenSecretManager.decodeWritable(id, node);
+        result.add(id);
+      } catch (Exception e) {
+        LOGGER.warn("Failed to decode token '{}'", node);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.zkSession != null) {
+      this.zkSession.close();
+    }
+  }
+
+  @Override
+  public void init(Object hmsHandler, HadoopThriftAuthBridge.Server.ServerMode 
sMode) {
+    this.serverMode = sMode;
+    zkConnectString =
+        
conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR, 
null);
+    if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+      // try alternate config param
+      zkConnectString =
+          conf.get(
+              
MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE,
+              null);
+      if (zkConnectString == null || zkConnectString.trim().isEmpty()) {
+        throw new IllegalArgumentException("Zookeeper connect string has to be 
specifed through "
+            + "either " + 
MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR
+            + " or "
+            + 
MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_STR_ALTERNATE
+            + WHEN_ZK_DSTORE_MSG);
+      }
+    }
+    connectTimeoutMillis =
+        conf.getInt(
+            
MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_CONNECT_TIMEOUTMILLIS,
+            CuratorFrameworkFactory.builder().getConnectionTimeoutMs());
+    String aclStr = 
conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ACL, null);
+    if (StringUtils.isNotBlank(aclStr)) {
+      this.newNodeAcl = parseACLs(aclStr);
+    }
+    rootNode =
+        
conf.get(MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE,
+            
MetastoreDelegationTokenManager.DELEGATION_TOKEN_STORE_ZK_ZNODE_DEFAULT) + 
serverMode;
+
+    try {
+      // Install the JAAS Configuration for the runtime
+      setupJAASConfig(conf);
+    } catch (IOException e) {
+      throw new TokenStoreException("Error setting up JAAS configuration for 
zookeeper client "
+          + e.getMessage(), e);
+    }
+    initClientAndPaths();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/8fea1176/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
index 9f0ca82..b05c995 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/SecurityUtils.java
@@ -17,10 +17,29 @@
  */
 package org.apache.hadoop.hive.metastore.utils;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.security.DBTokenStore;
+import org.apache.hadoop.hive.metastore.security.DelegationTokenIdentifier;
+import org.apache.hadoop.hive.metastore.security.DelegationTokenSelector;
+import org.apache.hadoop.hive.metastore.security.MemoryTokenStore;
+import org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
 
+import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.LoginException;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SecurityUtils {
   public static UserGroupInformation getUGI() throws LoginException, 
IOException {
@@ -36,5 +55,158 @@ public class SecurityUtils {
     }
     return UserGroupInformation.getCurrentUser();
   }
+  /**
+   * Dynamically sets up the JAAS configuration that uses kerberos
+   * @param principal
+   * @param keyTabFile
+   * @throws IOException
+   */
+  public static void setZookeeperClientKerberosJaasConfig(String principal, 
String keyTabFile) throws IOException {
+    // ZooKeeper property name to pick the correct JAAS conf section
+    final String SASL_LOGIN_CONTEXT_NAME = "HiveZooKeeperClient";
+    System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, 
SASL_LOGIN_CONTEXT_NAME);
+
+    principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+    JaasConfiguration jaasConf = new 
JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal, keyTabFile);
+
+    // Install the Configuration in the runtime.
+    javax.security.auth.login.Configuration.setConfiguration(jaasConf);
+  }
+
+  /**
+   * A JAAS configuration for ZooKeeper clients intended to use for SASL
+   * Kerberos.
+   */
+  private static class JaasConfiguration extends 
javax.security.auth.login.Configuration {
+    // Current installed Configuration
+    private static final boolean IBM_JAVA = System.getProperty("java.vendor")
+      .contains("IBM");
+    private final javax.security.auth.login.Configuration baseConfig = 
javax.security.auth.login.Configuration
+        .getConfiguration();
+    private final String loginContextName;
+    private final String principal;
+    private final String keyTabFile;
+
+    public JaasConfiguration(String hiveLoginContextName, String principal, 
String keyTabFile) {
+      this.loginContextName = hiveLoginContextName;
+      this.principal = principal;
+      this.keyTabFile = keyTabFile;
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
+      if (loginContextName.equals(appName)) {
+        Map<String, String> krbOptions = new HashMap<String, String>();
+        if (IBM_JAVA) {
+          krbOptions.put("credsType", "both");
+          krbOptions.put("useKeytab", keyTabFile);
+        } else {
+          krbOptions.put("doNotPrompt", "true");
+          krbOptions.put("storeKey", "true");
+          krbOptions.put("useKeyTab", "true");
+          krbOptions.put("keyTab", keyTabFile);
+        }
+  krbOptions.put("principal", principal);
+        krbOptions.put("refreshKrb5Config", "true");
+        AppConfigurationEntry hiveZooKeeperClientEntry = new 
AppConfigurationEntry(
+            KerberosUtil.getKrb5LoginModuleName(), 
LoginModuleControlFlag.REQUIRED, krbOptions);
+        return new AppConfigurationEntry[] { hiveZooKeeperClientEntry };
+      }
+      // Try the base config
+      if (baseConfig != null) {
+        return baseConfig.getAppConfigurationEntry(appName);
+      }
+      return null;
+    }
+  }
+  
+  /**
+   * Get the string form of the token given a token signature. The signature 
is used as the value of
+   * the "service" field in the token for lookup. Ref: 
AbstractDelegationTokenSelector in Hadoop. If
+   * there exists such a token in the token cache (credential store) of the 
job, the lookup returns
+   * that. This is relevant only when running against a "secure" hadoop 
release The method gets hold
+   * of the tokens if they are set up by hadoop - this should happen on the 
map/reduce tasks if the
+   * client added the tokens into hadoop's credential store in the front end 
during job submission.
+   * The method will select the hive delegation token among the set of tokens 
and return the string
+   * form of it
+   * 
+   * @param tokenSignature
+   * @return the string form of the token found
+   * @throws IOException
+   */
+  public static String getTokenStrForm(String tokenSignature) throws 
IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    TokenSelector<? extends TokenIdentifier> tokenSelector = new 
DelegationTokenSelector();
 
+    Token<? extends TokenIdentifier> token = tokenSelector.selectToken(
+        tokenSignature == null ? new Text() : new Text(tokenSignature), 
ugi.getTokens());
+    return token != null ? token.encodeToUrlString() : null;
+  }
+  
+  /**
+   * Create a delegation token object for the given token string and service. 
Add the token to given
+   * UGI
+   * 
+   * @param ugi
+   * @param tokenStr
+   * @param tokenService
+   * @throws IOException
+   */
+  public static void setTokenStr(UserGroupInformation ugi, String tokenStr, 
String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = createToken(tokenStr, 
tokenService);
+    ugi.addToken(delegationToken);
+  }
+
+  /**
+   * Create a new token using the given string and service
+   * 
+   * @param tokenStr
+   * @param tokenService
+   * @return
+   * @throws IOException
+   */
+  private static Token<DelegationTokenIdentifier> createToken(String tokenStr, 
String tokenService)
+      throws IOException {
+    Token<DelegationTokenIdentifier> delegationToken = new 
Token<DelegationTokenIdentifier>();
+    delegationToken.decodeFromUrlString(tokenStr);
+    delegationToken.setService(new Text(tokenService));
+    return delegationToken;
+  }
+
+  private static final String DELEGATION_TOKEN_STORE_CLS = 
"hive.cluster.delegation.token.store.class";
+
+  /**
+   * This method should be used to return the metastore specific tokenstore 
class name to main
+   * backwards compatibility
+   * 
+   * @param conf - HiveConf object
+   * @return the tokenStoreClass name from the HiveConf. It maps the hive 
specific tokenstoreclass
+   *         name to metastore module specific class name. For eg:
+   *         hive.cluster.delegation.token.store.class is set to
+   *         org.apache.hadoop.hive.thrift.MemoryTokenStore it returns the 
equivalent tokenstore
+   *         class defined in the metastore module which is
+   *         org.apache.hadoop.hive.metastore.security.MemoryTokenStore 
Similarly,
+   *         org.apache.hadoop.hive.thrift.DBTokenStore maps to
+   *         org.apache.hadoop.hive.metastore.security.DBTokenStore and
+   *         org.apache.hadoop.hive.thrift.ZooKeeperTokenStore maps to
+   *         org.apache.hadoop.hive.metastore.security.ZooKeeperTokenStore
+   */
+  public static String getTokenStoreClassName(Configuration conf) {
+    String tokenStoreClass = conf.get(DELEGATION_TOKEN_STORE_CLS, "");
+    if (StringUtils.isBlank(tokenStoreClass)) {
+      // default tokenstore is MemoryTokenStore
+      return MemoryTokenStore.class.getName();
+    }
+    switch (tokenStoreClass) {
+    case "org.apache.hadoop.hive.thrift.DBTokenStore":
+      return DBTokenStore.class.getName();
+    case "org.apache.hadoop.hive.thrift.MemoryTokenStore":
+      return MemoryTokenStore.class.getName();
+    case "org.apache.hadoop.hive.thrift.ZooKeeperTokenStore":
+      return ZooKeeperTokenStore.class.getName();
+    default:
+      return tokenStoreClass;
+    }
+  }
 }

Reply via email to