HIVE-14822: Add support for credential provider for jobs launched from 
Hiveserver2 (Vihang Karajgaonkar, reviewed by Barna Zsombor Klara, Mohit 
Sabharwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9caf2300
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9caf2300
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9caf2300

Branch: refs/heads/hive-14535
Commit: 9caf230013377229e88b2c8eeaf53b7038e8e9a1
Parents: 36bdbcc
Author: Mohit Sabharwal <mo...@cloudera.com>
Authored: Mon Oct 17 12:48:49 2016 -0400
Committer: Mohit Sabharwal <mo...@cloudera.com>
Committed: Mon Oct 17 12:48:49 2016 -0400

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |   2 +
 .../org/apache/hadoop/hive/conf/Constants.java  |   4 +
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +-
 .../apache/hadoop/hive/conf/HiveConfUtil.java   |  91 ++++++
 .../hive/common/util/HiveStringUtils.java       |  32 ++
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |   2 +
 .../ql/exec/spark/HiveSparkClientFactory.java   |  14 +
 .../ql/exec/spark/LocalHiveSparkClient.java     |   6 +
 .../ql/exec/spark/RemoteHiveSparkClient.java    |   4 +
 .../ql/exec/TestHiveCredentialProviders.java    | 314 +++++++++++++++++++
 .../hive/spark/client/SparkClientImpl.java      |  18 +-
 11 files changed, 490 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 3ed2d08..1d734f9 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.io.HdfsUtils;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -576,6 +577,7 @@ public final class FileUtils {
         srcFS.getFileStatus(src).getLen() > 
conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) {
       LOG.info("Source is " + srcFS.getFileStatus(src).getLen() + " bytes. 
(MAX: " + conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")");
       LOG.info("Launch distributed copy (distcp) job.");
+      HiveConfUtil.updateJobCredentialProviders(conf);
       copied = shims.runDistCp(src, dst, conf);
       if (copied && deleteSource) {
         srcFS.delete(src, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/conf/Constants.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java 
b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index 77c6aa5..6c42163 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -30,4 +30,8 @@ public class Constants {
   public static final String DRUID_QUERY_JSON = "druid.query.json";
   public static final String DRUID_QUERY_TYPE = "druid.query.type";
   public static final String DRUID_QUERY_FETCH = "druid.query.fetch";
+
+  public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = 
"HIVE_JOB_CREDSTORE_PASSWORD";
+  public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = 
"HADOOP_CREDSTORE_PASSWORD";
+  public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = 
"hadoop.security.credential.provider.path";
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8ffae3b..6f168b5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2521,7 +2521,10 @@ public class HiveConf extends Configuration {
         "if an X-XSRF-HEADER header is not present"),
     HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", 
"set,reset,dfs,add,list,delete,reload,compile",
         "Comma separated list of non-SQL Hive commands users are authorized to 
execute"),
-
+    
HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH("hive.server2.job.credential.provider.path",
 "",
+        "If set, this configuration property should provide a comma-separated 
list of URLs that indicates the type and " +
+        "location of providers to be used by hadoop credential provider API. 
It provides HiveServer2 the ability to provide job-specific " +
+        "credential providers for jobs run using MR and Spark execution 
engines. This functionality has not been tested against Tez."),
     HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new  
SizeValidator(0L, true, 1024L, true), "Number of threads"
          + " used to move files in move task. Set it to 0 to disable 
multi-threaded file moves. This parameter is also used by"
          + " MSCK to check tables."),

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
index 16c2eaf..9ba08e5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java
@@ -18,8 +18,14 @@
 
 package org.apache.hadoop.hive.conf;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.common.util.HiveStringUtils;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -37,6 +43,8 @@ import java.util.StringTokenizer;
  */
 @Private
 public class HiveConfUtil {
+  private static final String CLASS_NAME = HiveConfUtil.class.getName();
+  private static final Log LOG = LogFactory.getLog(CLASS_NAME);
   /**
    * Check if metastore is being used in embedded mode.
    * This utility function exists so that the logic for determining the mode 
is same
@@ -122,4 +130,87 @@ public class HiveConfUtil {
       }
     }
   }
+
+  /**
+   * Updates the job configuration with the job specific credential provider 
information available
+   * in the HiveConf.It uses the environment variables 
HADOOP_CREDSTORE_PASSWORD or
+   * HIVE_JOB_CREDSTORE_PASSWORD to get the custom password for all the 
keystores configured in the
+   * provider path. This usage of environment variables is similar in lines 
with Hadoop credential
+   * provider mechanism for getting the keystore passwords. The other way of 
communicating the
+   * password is through a file which stores the password in clear-text which 
needs to be readable
+   * by all the consumers and therefore is not supported.
+   *
+   * <li>If HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH is set in the hive 
configuration this method
+   * overrides the MR job configuration property 
hadoop.security.credential.provider.path with its
+   * value. If not set then it does not change the value of 
hadoop.security.credential.provider.path
+   * <li>In order to choose the password for the credential provider we check :
+   *
+   *   (1) if job credential provider path 
HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH is set we check if
+   *       HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR is set. If it is set we 
use it.
+   *   (2) If password is not set using (1) above we use 
HADOOP_CREDSTORE_PASSWORD if it is set.
+   *   (3) If none of those are set, we do not set any password in the MR task 
environment. In this
+   *       case the hadoop credential provider should use the default password 
of "none" automatically
+   *
+   * @param jobConf - job specific configuration
+   */
+  public static void updateJobCredentialProviders(Configuration jobConf) {
+    if(jobConf == null) {
+      return;
+    }
+
+    String jobKeyStoreLocation = 
jobConf.get(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH.varname);
+    String oldKeyStoreLocation = 
jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG);
+    if (StringUtils.isNotBlank(jobKeyStoreLocation)) {
+      jobConf.set(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG, 
jobKeyStoreLocation);
+      LOG.debug("Setting job conf credstore location to " + jobKeyStoreLocation
+          + " previous location was " + oldKeyStoreLocation);
+    }
+
+    String credStorepassword = getJobCredentialProviderPassword(jobConf);
+    if (credStorepassword != null) {
+      // if the execution engine is MR set the map/reduce env with the 
credential store password
+      String execEngine = jobConf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname);
+      if ("mr".equalsIgnoreCase(execEngine)) {
+        addKeyValuePair(jobConf, JobConf.MAPRED_MAP_TASK_ENV,
+            Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, credStorepassword);
+        addKeyValuePair(jobConf, JobConf.MAPRED_REDUCE_TASK_ENV,
+            Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, credStorepassword);
+        addKeyValuePair(jobConf, "yarn.app.mapreduce.am.admin.user.env",
+            Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, credStorepassword);
+      }
+    }
+  }
+
+  /*
+   * If HIVE_SERVER2_JOB_CREDSTORE_LOCATION is set check 
HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR before
+   * checking HADOOP_CREDENTIAL_PASSWORD_ENVVAR
+   */
+  public static String getJobCredentialProviderPassword(Configuration conf) {
+    String jobKeyStoreLocation =
+        
conf.get(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH.varname);
+    String password = null;
+    if(StringUtils.isNotBlank(jobKeyStoreLocation)) {
+      password = 
System.getenv(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR);
+      if (StringUtils.isNotBlank(password)) {
+        return password;
+      }
+    }
+    password = System.getenv(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR);
+    if (StringUtils.isNotBlank(password)) {
+      return password;
+    }
+    return null;
+  }
+
+  private static void addKeyValuePair(Configuration jobConf, String property, 
String keyName,
+      String newKeyValue) {
+    String existingValue = jobConf.get(property);
+    if (existingValue == null) {
+      jobConf.set(property, (keyName + "=" + newKeyValue));
+      return;
+    }
+
+    String propertyValue = HiveStringUtils.insertValue(keyName, newKeyValue, 
existingValue);
+    jobConf.set(property, propertyValue);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hive/common/util/HiveStringUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/HiveStringUtils.java 
b/common/src/java/org/apache/hive/common/util/HiveStringUtils.java
index 507e369..9db312b 100644
--- a/common/src/java/org/apache/hive/common/util/HiveStringUtils.java
+++ b/common/src/java/org/apache/hive/common/util/HiveStringUtils.java
@@ -444,6 +444,7 @@ public class HiveStringUtils {
 
   final public static String[] emptyStringArray = {};
   final public static char COMMA = ',';
+  final public static char EQUALS = '=';
   final public static String COMMA_STR = ",";
   final public static char ESCAPE_CHAR = '\\';
 
@@ -543,6 +544,37 @@ public class HiveStringUtils {
   }
 
   /**
+   * In a given string of comma-separated key=value pairs insert a new value 
of a given key
+   *
+   * @param key The key whose value needs to be replaced
+   * @param newValue The new value of the key
+   * @param strKvPairs Comma separated key=value pairs Eg: "k1=v1, k2=v2, 
k3=v3"
+   * @return Comma separated string of key=value pairs with the new value for 
key keyName
+   */
+  public static String insertValue(String key, String newValue,
+      String strKvPairs) {
+    String[] keyValuePairs = HiveStringUtils.split(strKvPairs);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < keyValuePairs.length; i++) {
+      String[] pair = HiveStringUtils.split(keyValuePairs[i], ESCAPE_CHAR, 
EQUALS);
+      if (pair.length != 2) {
+        throw new RuntimeException("Error parsing the keyvalue pair " + 
keyValuePairs[i]);
+      }
+      sb.append(pair[0]);
+      sb.append(EQUALS);
+      if (pair[0].equals(key)) {
+        sb.append(newValue);
+      } else {
+        sb.append(pair[1]);
+      }
+      if (i < (keyValuePairs.length - 1)) {
+        sb.append(COMMA);
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
    * Finds the first occurrence of the separator character ignoring the escaped
    * separators starting from the index. Note the substring between the index
    * and the position of the separator is passed.

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index cea9582..9b07e21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -413,6 +414,7 @@ public class ExecDriver extends Task<MapredWork> implements 
Serializable, Hadoop
         TezSessionPoolManager.getInstance().closeIfNotDefault(session, true);
       }
 
+      HiveConfUtil.updateJobCredentialProviders(job);
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
       this.jobID = rj.getJobID();

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 784b9c9..d71a84c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
@@ -195,9 +196,22 @@ public class HiveSparkClientFactory {
       sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false");
     }
 
+    // Set the credential provider passwords if found, if there is job 
specific password
+    // the credential provider location is set directly in the execute method 
of LocalSparkClient
+    // and submit method of RemoteHiveSparkClient when the job config is 
created
+    String password = HiveConfUtil.getJobCredentialProviderPassword(hiveConf);
+    if(password != null) {
+      addCredentialProviderPassword(sparkConf, password);
+    }
     return sparkConf;
   }
 
+  private static void addCredentialProviderPassword(Map<String, String> 
sparkConf,
+      String jobCredstorePassword) {
+    sparkConf.put("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD", 
jobCredstorePassword);
+    sparkConf.put("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", 
jobCredstorePassword);
+  }
+
   static SparkConf generateSparkConf(Map<String, String> conf) {
     SparkConf sparkConf = new SparkConf(false);
     for (Map.Entry<String, String> entry : conf.entrySet()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index c75333d..f5d9c4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -113,6 +114,11 @@ public class LocalHiveSparkClient implements 
HiveSparkClient {
     FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
     fs.mkdirs(emptyScratchDir);
 
+    // Update credential provider location
+    // the password to the credential provider in already set in the sparkConf
+    // in HiveSparkClientFactory
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+
     SparkCounters sparkCounters = new SparkCounters(sc);
     Map<String, List<String>> prefixes = sparkWork.getRequiredCounterPrefix();
     if (prefixes != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index a9f70c4..a705dfc 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -191,6 +192,9 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
     refreshLocalResources(sparkWork, hiveConf);
     final JobConf jobConf = new JobConf(hiveConf);
 
+    //update the credential provider location in the jobConf
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+
     // Create temporary scratch dir
     final Path emptyScratchDir = ctx.getMRTmpPath();
     FileSystem fs = emptyScratchDir.getFileSystem(jobConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java
new file mode 100644
index 0000000..a31898c
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static 
org.apache.hadoop.hive.conf.Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR;
+import static 
org.apache.hadoop.hive.conf.Constants.HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG;
+import static 
org.apache.hadoop.hive.conf.Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHiveCredentialProviders {
+  private static final String HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL = 
"testhadoopCredStorePassword";
+  private static final String HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL = 
"testhiveJobCredPassword";
+  private static final String JOB_CREDSTORE_LOCATION = 
"jceks://hdfs/user/hive/creds.jceks";
+  private static final String HADOOP_CREDSTORE_LOCATION =
+      "localjceks://file/user/hive/localcreds.jceks";
+
+  private Configuration jobConf;
+
+  /*
+   * Dirty hack to set the environment variables using reflection code. This 
method is for testing
+   * purposes only and should not be used elsewhere
+   */
+  private final static void setEnv(Map<String, String> newenv) throws 
Exception {
+    Class[] classes = Collections.class.getDeclaredClasses();
+    Map<String, String> env = System.getenv();
+    for (Class cl : classes) {
+      if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+        Field field = cl.getDeclaredField("m");
+        field.setAccessible(true);
+        Object obj = field.get(env);
+        Map<String, String> map = (Map<String, String>) obj;
+        map.clear();
+        map.putAll(newenv);
+      }
+    }
+  }
+
+  @Before
+  public void resetConfig() {
+    jobConf = new JobConf();
+  }
+  /*
+   * Tests whether credential provider is updated when 
HIVE_JOB_CREDSTORE_PASSWORD is set and when
+   * hiveConf sets HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION
+   *
+   * JobConf should contain the mapred env variable equal to 
${HIVE_JOB_CREDSTORE_PASSWORD} and the
+   * hadoop.security.credential.provider.path property should be equal to 
value of
+   * HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION
+   */
+  @Test
+  public void testJobCredentialProvider() throws Exception {
+    setupConfigs(true, true, true, true);
+
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+    // make sure credential provider path points to 
HIVE_SERVER2_JOB_CREDSTORE_LOCATION
+    Assert.assertEquals(JOB_CREDSTORE_LOCATION,
+        jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG));
+
+    // make sure MAP task environment points to HIVE_JOB_CREDSTORE_PASSWORD
+    Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, 
getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
+    // make sure REDUCE task environment points to HIVE_JOB_CREDSTORE_PASSWORD
+    Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, 
getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+  }
+
+  /*
+   * If hive job credstore location is not set, but hadoop credential provider 
is set
+   * jobConf should contain hadoop credstore location and password should be 
from HADOOP_CREDSTORE_PASSWORD
+   */
+  @Test
+  public void testHadoopCredentialProvider() throws Exception {
+    setupConfigs(true, true, true, false);
+
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+    Assert.assertEquals(HADOOP_CREDSTORE_LOCATION,
+        jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG));
+
+    // make sure MAP task environment points to HADOOP_CREDSTORE_PASSWORD
+    Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, 
getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
+    // make sure REDUCE task environment points to HADOOP_CREDSTORE_PASSWORD
+    Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, 
getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+  }
+
+  /*
+   * If there is no credential provider configured for hadoop, jobConf should 
not contain
+   * credstore password and provider path even if HIVE_JOB_CRESTORE_PASSWORD 
env is set
+   */
+  @Test
+  public void testNoCredentialProviderWithPassword() throws Exception {
+    setupConfigs(false, false, true, false);
+
+    
Assert.assertTrue(StringUtils.isBlank(jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)));
+
+    
Assert.assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV),
+        HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
+    
Assert.assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV),
+        HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+  }
+
+  /*
+   * If hive job credential provider is set but HIVE_JOB_CREDSTORE_PASSWORD is 
not set, use
+   * HADOOP_CREDSTORE_PASSWORD in the jobConf
+   */
+  @Test
+  public void testJobCredentialProviderWithDefaultPassword() throws Exception {
+    setupConfigs(false, true, false, true);
+
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+    Assert.assertEquals(JOB_CREDSTORE_LOCATION,
+        jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG));
+
+    Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, 
getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
+    Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, 
getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+  }
+
+  /*
+   * When neither HADOOP_CREDSTORE_PASSWORD nor HIVE_JOB_CREDSTORE_PASSWORD
+   * are not set jobConf should contain only the credential provider path
+   */
+  @Test
+  public void testCredentialProviderWithNoPasswords() throws Exception {
+    setupConfigs(true, false, false, true);
+
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+    Assert.assertEquals(JOB_CREDSTORE_LOCATION,
+        jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG));
+    Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV));
+    Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV));
+
+    resetConfig();
+    setupConfigs(true, false, false, false);
+
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+    Assert.assertEquals(HADOOP_CREDSTORE_LOCATION,
+        jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG));
+    Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV));
+    Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV));
+  }
+
+  /*
+   * default behavior when neither hive.job.credstore location is set nor
+   * HIVE_JOB_CREDSTORE_PASSWORD is. In this case if hadoop credential 
provider is configured job
+   * config should use that else it should remain unset
+   */
+  @Test
+  public void testJobCredentialProviderUnset() throws Exception {
+    setupConfigs(true, true, false, false);
+
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+    assertEquals(HADOOP_CREDSTORE_LOCATION, 
jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG));
+
+    assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
+    assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf(
+        jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), 
HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+  }
+
+  /*
+   * Test the unsecure base case when neither hadoop nor job-specific
+   * credential provider is set
+   */
+  @Test
+  public void testNoCredentialProvider() throws Exception {
+    setupConfigs(false, false, false, false);
+
+    
assertTrue(StringUtils.isBlank(jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)));
+
+    assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV),
+        HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+
+    assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV),
+        HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+  }
+
+  /*
+   * Test updateCredentialProviders does not corrupt existing values of
+   * Mapred env configs
+   */
+  @Test
+  public void testExistingConfiguration() throws Exception {
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "k1=v1, k2=v2, 
HADOOP_CREDSTORE_PASSWORD=test");
+    setupConfigs(false, true, false, true);
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+
+    assertEquals("v1", 
getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1"));
+    assertEquals("v2", 
getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2"));
+
+    resetConfig();
+
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "k1=v1, 
HADOOP_CREDSTORE_PASSWORD=test, k2=v2");
+    setupConfigs(false, true, false, true);
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+
+    assertEquals("v1", 
getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1"));
+    assertEquals("v2", 
getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2"));
+
+    resetConfig();
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "HADOOP_CREDSTORE_PASSWORD=test, 
k1=v1, k2=v2");
+    setupConfigs(false, true, false, true);
+    HiveConfUtil.updateJobCredentialProviders(jobConf);
+
+    assertEquals("v1", 
getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1"));
+    assertEquals("v2", 
getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2"));
+  }
+
+  /**
+   * Sets up the environment and configurations
+   *
+   * @param setHadoopCredProvider set hadoop credstore provider path
+   * @param setHadoopCredstorePassword set HADOOP_CREDSTORE_PASSWORD env 
variable
+   * @param setHiveCredPassword set HIVE_JOB_CREDSTORE_PASSWORD env variable
+   * @param setHiveProviderPath set 
HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION in the
+   *          hive config
+   * @throws Exception
+   */
+  private void setupConfigs(boolean setHadoopCredProvider, boolean 
setHadoopCredstorePassword,
+      boolean setHiveCredPassword, boolean setHiveProviderPath) throws 
Exception {
+    Map<String, String> mockEnv = new HashMap<>();
+    // sets the env variable HADOOP_CREDSTORE_PASSWORD to value defined by 
HADOOP_CREDSTORE_PASSWORD
+    // sets hadoop.security.credential.provider.path property to simulate 
default credential
+    // provider setup
+    if (setHadoopCredProvider) {
+      jobConf.set(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG, 
HADOOP_CREDSTORE_LOCATION);
+    }
+    if (setHadoopCredstorePassword) {
+      mockEnv.put(HADOOP_CREDENTIAL_PASSWORD_ENVVAR, 
HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL);
+    }
+    // sets the env variable HIVE_JOB_CREDSTORE_PASSWORD to value defined by
+    // HIVE_JOB_CREDSTORE_PASSWORD
+    if (setHiveCredPassword) {
+      mockEnv.put(HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR, 
HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL);
+    }
+    TestHiveCredentialProviders.setEnv(mockEnv);
+    // set hive provider path in hiveConf if setHiveProviderPath is true
+    // simulates hive.server2.job.credstore.location property set in 
hive-site.xml/core-site.xml of
+    // HS2
+    if (setHiveProviderPath) {
+      
jobConf.set(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH.varname,
+          JOB_CREDSTORE_LOCATION);
+    }
+    jobConf.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "mr");
+  }
+
+  /*
+   * Extract value from a comma-separated key=value pairs
+   */
+  private String getValueFromJobConf(String keyValuePairs, String key) {
+    if (keyValuePairs == null) {
+      return null;
+    }
+    String[] keyValues = keyValuePairs.split(",");
+    for (String kv : keyValues) {
+      String[] parts = kv.split("=");
+      if (key.equals(parts[0].trim())) {
+        return parts[1].trim();
+      }
+    }
+    return null;
+  }
+
+  /*
+   * Test if the environment variables can be set. If this test fails
+   * all the other tests will also fail because environment is not getting 
setup
+   */
+  @Test
+  public void testEnv() throws Exception {
+    Map<String, String> mockEnv = new HashMap<>();
+    mockEnv.put(HADOOP_CREDENTIAL_PASSWORD_ENVVAR, 
HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL);
+    mockEnv.put(HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR, 
HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL);
+    TestHiveCredentialProviders.setEnv(mockEnv);
+    assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, 
System.getenv(HADOOP_CREDENTIAL_PASSWORD_ENVVAR));
+    assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, 
System.getenv(HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 936fdaf..e2a30a7 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -53,6 +53,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.Utils;
@@ -206,6 +207,7 @@ class SparkClientImpl implements SparkClient {
 
     if (conf.containsKey(SparkClientFactory.CONF_KEY_IN_PROCESS)) {
       // Mostly for testing things quickly. Do not do this in production.
+      // when invoked in-process it inherits the environment variables of the 
parent
       LOG.warn("!!!! Running remote driver in-process. !!!!");
       runnable = new Runnable() {
         @Override
@@ -440,7 +442,12 @@ class SparkClientImpl implements SparkClient {
       // Prevent hive configurations from being visible in Spark.
       pb.environment().remove("HIVE_HOME");
       pb.environment().remove("HIVE_CONF_DIR");
-
+      // Add credential provider password to the child process's environment
+      // In case of Spark the credential provider location is provided in the 
jobConf when the job is submitted
+      String password = getSparkJobCredentialProviderPassword();
+      if(password != null) {
+        pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, 
password);
+      }
       if (isTesting != null) {
         pb.environment().put("SPARK_TESTING", isTesting);
       }
@@ -485,6 +492,15 @@ class SparkClientImpl implements SparkClient {
     return thread;
   }
 
+  private String getSparkJobCredentialProviderPassword() {
+    if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) 
{
+      return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD");
+    } else if 
(conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) {
+      return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD");
+    }
+    return null;
+  }
+
   private void redirect(String name, Redirector redirector) {
     Thread thread = new Thread(redirector);
     thread.setName(name);

Reply via email to