HIVE-14822: Add support for credential provider for jobs launched from Hiveserver2 (Vihang Karajgaonkar, reviewed by Barna Zsombor Klara, Mohit Sabharwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9caf2300 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9caf2300 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9caf2300 Branch: refs/heads/hive-14535 Commit: 9caf230013377229e88b2c8eeaf53b7038e8e9a1 Parents: 36bdbcc Author: Mohit Sabharwal <[email protected]> Authored: Mon Oct 17 12:48:49 2016 -0400 Committer: Mohit Sabharwal <[email protected]> Committed: Mon Oct 17 12:48:49 2016 -0400 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 2 + .../org/apache/hadoop/hive/conf/Constants.java | 4 + .../org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../apache/hadoop/hive/conf/HiveConfUtil.java | 91 ++++++ .../hive/common/util/HiveStringUtils.java | 32 ++ .../hadoop/hive/ql/exec/mr/ExecDriver.java | 2 + .../ql/exec/spark/HiveSparkClientFactory.java | 14 + .../ql/exec/spark/LocalHiveSparkClient.java | 6 + .../ql/exec/spark/RemoteHiveSparkClient.java | 4 + .../ql/exec/TestHiveCredentialProviders.java | 314 +++++++++++++++++++ .../hive/spark/client/SparkClientImpl.java | 18 +- 11 files changed, 490 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 3ed2d08..1d734f9 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -576,6 +577,7 @@ public final class FileUtils { srcFS.getFileStatus(src).getLen() > conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE)) { LOG.info("Source is " + srcFS.getFileStatus(src).getLen() + " bytes. (MAX: " + conf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE) + ")"); LOG.info("Launch distributed copy (distcp) job."); + HiveConfUtil.updateJobCredentialProviders(conf); copied = shims.runDistCp(src, dst, conf); if (copied && deleteSource) { srcFS.delete(src, true); http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/conf/Constants.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 77c6aa5..6c42163 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -30,4 +30,8 @@ public class Constants { public static final String DRUID_QUERY_JSON = "druid.query.json"; public static final String DRUID_QUERY_TYPE = "druid.query.type"; public static final String DRUID_QUERY_FETCH = "druid.query.fetch"; + + public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD"; + public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD"; + public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = "hadoop.security.credential.provider.path"; } http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8ffae3b..6f168b5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2521,7 +2521,10 @@ public class HiveConf extends Configuration { "if an X-XSRF-HEADER header is not present"), HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), - + HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH("hive.server2.job.credential.provider.path", "", + "If set, this configuration property should provide a comma-separated list of URLs that indicates the type and " + + "location of providers to be used by hadoop credential provider API. It provides HiveServer2 the ability to provide job-specific " + + "credential providers for jobs run using MR and Spark execution engines. This functionality has not been tested against Tez."), HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 15, new SizeValidator(0L, true, 1024L, true), "Number of threads" + " used to move files in move task. Set it to 0 to disable multi-threaded file moves. This parameter is also used by" + " MSCK to check tables."), http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java index 16c2eaf..9ba08e5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConfUtil.java @@ -18,8 +18,14 @@ package org.apache.hadoop.hive.conf; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hive.common.util.HiveStringUtils; import java.io.File; import java.util.ArrayList; @@ -37,6 +43,8 @@ import java.util.StringTokenizer; */ @Private public class HiveConfUtil { + private static final String CLASS_NAME = HiveConfUtil.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); /** * Check if metastore is being used in embedded mode. * This utility function exists so that the logic for determining the mode is same @@ -122,4 +130,87 @@ public class HiveConfUtil { } } } + + /** + * Updates the job configuration with the job specific credential provider information available + * in the HiveConf.It uses the environment variables HADOOP_CREDSTORE_PASSWORD or + * HIVE_JOB_CREDSTORE_PASSWORD to get the custom password for all the keystores configured in the + * provider path. This usage of environment variables is similar in lines with Hadoop credential + * provider mechanism for getting the keystore passwords. The other way of communicating the + * password is through a file which stores the password in clear-text which needs to be readable + * by all the consumers and therefore is not supported. + * + * <li>If HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH is set in the hive configuration this method + * overrides the MR job configuration property hadoop.security.credential.provider.path with its + * value. If not set then it does not change the value of hadoop.security.credential.provider.path + * <li>In order to choose the password for the credential provider we check : + * + * (1) if job credential provider path HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH is set we check if + * HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR is set. If it is set we use it. + * (2) If password is not set using (1) above we use HADOOP_CREDSTORE_PASSWORD if it is set. + * (3) If none of those are set, we do not set any password in the MR task environment. In this + * case the hadoop credential provider should use the default password of "none" automatically + * + * @param jobConf - job specific configuration + */ + public static void updateJobCredentialProviders(Configuration jobConf) { + if(jobConf == null) { + return; + } + + String jobKeyStoreLocation = jobConf.get(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH.varname); + String oldKeyStoreLocation = jobConf.get(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG); + if (StringUtils.isNotBlank(jobKeyStoreLocation)) { + jobConf.set(Constants.HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG, jobKeyStoreLocation); + LOG.debug("Setting job conf credstore location to " + jobKeyStoreLocation + + " previous location was " + oldKeyStoreLocation); + } + + String credStorepassword = getJobCredentialProviderPassword(jobConf); + if (credStorepassword != null) { + // if the execution engine is MR set the map/reduce env with the credential store password + String execEngine = jobConf.get(ConfVars.HIVE_EXECUTION_ENGINE.varname); + if ("mr".equalsIgnoreCase(execEngine)) { + addKeyValuePair(jobConf, JobConf.MAPRED_MAP_TASK_ENV, + Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, credStorepassword); + addKeyValuePair(jobConf, JobConf.MAPRED_REDUCE_TASK_ENV, + Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, credStorepassword); + addKeyValuePair(jobConf, "yarn.app.mapreduce.am.admin.user.env", + Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, credStorepassword); + } + } + } + + /* + * If HIVE_SERVER2_JOB_CREDSTORE_LOCATION is set check HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR before + * checking HADOOP_CREDENTIAL_PASSWORD_ENVVAR + */ + public static String getJobCredentialProviderPassword(Configuration conf) { + String jobKeyStoreLocation = + conf.get(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH.varname); + String password = null; + if(StringUtils.isNotBlank(jobKeyStoreLocation)) { + password = System.getenv(Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR); + if (StringUtils.isNotBlank(password)) { + return password; + } + } + password = System.getenv(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR); + if (StringUtils.isNotBlank(password)) { + return password; + } + return null; + } + + private static void addKeyValuePair(Configuration jobConf, String property, String keyName, + String newKeyValue) { + String existingValue = jobConf.get(property); + if (existingValue == null) { + jobConf.set(property, (keyName + "=" + newKeyValue)); + return; + } + + String propertyValue = HiveStringUtils.insertValue(keyName, newKeyValue, existingValue); + jobConf.set(property, propertyValue); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/common/src/java/org/apache/hive/common/util/HiveStringUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hive/common/util/HiveStringUtils.java b/common/src/java/org/apache/hive/common/util/HiveStringUtils.java index 507e369..9db312b 100644 --- a/common/src/java/org/apache/hive/common/util/HiveStringUtils.java +++ b/common/src/java/org/apache/hive/common/util/HiveStringUtils.java @@ -444,6 +444,7 @@ public class HiveStringUtils { final public static String[] emptyStringArray = {}; final public static char COMMA = ','; + final public static char EQUALS = '='; final public static String COMMA_STR = ","; final public static char ESCAPE_CHAR = '\\'; @@ -543,6 +544,37 @@ public class HiveStringUtils { } /** + * In a given string of comma-separated key=value pairs insert a new value of a given key + * + * @param key The key whose value needs to be replaced + * @param newValue The new value of the key + * @param strKvPairs Comma separated key=value pairs Eg: "k1=v1, k2=v2, k3=v3" + * @return Comma separated string of key=value pairs with the new value for key keyName + */ + public static String insertValue(String key, String newValue, + String strKvPairs) { + String[] keyValuePairs = HiveStringUtils.split(strKvPairs); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < keyValuePairs.length; i++) { + String[] pair = HiveStringUtils.split(keyValuePairs[i], ESCAPE_CHAR, EQUALS); + if (pair.length != 2) { + throw new RuntimeException("Error parsing the keyvalue pair " + keyValuePairs[i]); + } + sb.append(pair[0]); + sb.append(EQUALS); + if (pair[0].equals(key)) { + sb.append(newValue); + } else { + sb.append(pair[1]); + } + if (i < (keyValuePairs.length - 1)) { + sb.append(COMMA); + } + } + return sb.toString(); + } + + /** * Finds the first occurrence of the separator character ignoring the escaped * separators starting from the index. Note the substring between the index * and the position of the separator is passed. http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index cea9582..9b07e21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; @@ -413,6 +414,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop TezSessionPoolManager.getInstance().closeIfNotDefault(session, true); } + HiveConfUtil.updateJobCredentialProviders(job); // Finally SUBMIT the JOB! rj = jc.submitJob(job); this.jobID = rj.getJobID(); http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 784b9c9..d71a84c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.io.BytesWritable; @@ -195,9 +196,22 @@ public class HiveSparkClientFactory { sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false"); } + // Set the credential provider passwords if found, if there is job specific password + // the credential provider location is set directly in the execute method of LocalSparkClient + // and submit method of RemoteHiveSparkClient when the job config is created + String password = HiveConfUtil.getJobCredentialProviderPassword(hiveConf); + if(password != null) { + addCredentialProviderPassword(sparkConf, password); + } return sparkConf; } + private static void addCredentialProviderPassword(Map<String, String> sparkConf, + String jobCredstorePassword) { + sparkConf.put("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword); + sparkConf.put("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD", jobCredstorePassword); + } + static SparkConf generateSparkConf(Map<String, String> conf) { SparkConf sparkConf = new SparkConf(false); for (Map.Entry<String, String> entry : conf.entrySet()) { http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index c75333d..f5d9c4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -113,6 +114,11 @@ public class LocalHiveSparkClient implements HiveSparkClient { FileSystem fs = emptyScratchDir.getFileSystem(jobConf); fs.mkdirs(emptyScratchDir); + // Update credential provider location + // the password to the credential provider in already set in the sparkConf + // in HiveSparkClientFactory + HiveConfUtil.updateJobCredentialProviders(jobConf); + SparkCounters sparkCounters = new SparkCounters(sc); Map<String, List<String>> prefixes = sparkWork.getRequiredCounterPrefix(); if (prefixes != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index a9f70c4..a705dfc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.conf.HiveConfUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -191,6 +192,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient { refreshLocalResources(sparkWork, hiveConf); final JobConf jobConf = new JobConf(hiveConf); + //update the credential provider location in the jobConf + HiveConfUtil.updateJobCredentialProviders(jobConf); + // Create temporary scratch dir final Path emptyScratchDir = ctx.getMRTmpPath(); FileSystem fs = emptyScratchDir.getFileSystem(jobConf); http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java new file mode 100644 index 0000000..a31898c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHiveCredentialProviders.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfUtil; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.apache.hadoop.hive.conf.Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR; +import static org.apache.hadoop.hive.conf.Constants.HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG; +import static org.apache.hadoop.hive.conf.Constants.HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestHiveCredentialProviders { + private static final String HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL = "testhadoopCredStorePassword"; + private static final String HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL = "testhiveJobCredPassword"; + private static final String JOB_CREDSTORE_LOCATION = "jceks://hdfs/user/hive/creds.jceks"; + private static final String HADOOP_CREDSTORE_LOCATION = + "localjceks://file/user/hive/localcreds.jceks"; + + private Configuration jobConf; + + /* + * Dirty hack to set the environment variables using reflection code. This method is for testing + * purposes only and should not be used elsewhere + */ + private final static void setEnv(Map<String, String> newenv) throws Exception { + Class[] classes = Collections.class.getDeclaredClasses(); + Map<String, String> env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map<String, String> map = (Map<String, String>) obj; + map.clear(); + map.putAll(newenv); + } + } + } + + @Before + public void resetConfig() { + jobConf = new JobConf(); + } + /* + * Tests whether credential provider is updated when HIVE_JOB_CREDSTORE_PASSWORD is set and when + * hiveConf sets HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION + * + * JobConf should contain the mapred env variable equal to ${HIVE_JOB_CREDSTORE_PASSWORD} and the + * hadoop.security.credential.provider.path property should be equal to value of + * HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION + */ + @Test + public void testJobCredentialProvider() throws Exception { + setupConfigs(true, true, true, true); + + HiveConfUtil.updateJobCredentialProviders(jobConf); + // make sure credential provider path points to HIVE_SERVER2_JOB_CREDSTORE_LOCATION + Assert.assertEquals(JOB_CREDSTORE_LOCATION, + jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)); + + // make sure MAP task environment points to HIVE_JOB_CREDSTORE_PASSWORD + Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + + // make sure REDUCE task environment points to HIVE_JOB_CREDSTORE_PASSWORD + Assert.assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + } + + /* + * If hive job credstore location is not set, but hadoop credential provider is set + * jobConf should contain hadoop credstore location and password should be from HADOOP_CREDSTORE_PASSWORD + */ + @Test + public void testHadoopCredentialProvider() throws Exception { + setupConfigs(true, true, true, false); + + HiveConfUtil.updateJobCredentialProviders(jobConf); + Assert.assertEquals(HADOOP_CREDSTORE_LOCATION, + jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)); + + // make sure MAP task environment points to HADOOP_CREDSTORE_PASSWORD + Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + + // make sure REDUCE task environment points to HADOOP_CREDSTORE_PASSWORD + Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + } + + /* + * If there is no credential provider configured for hadoop, jobConf should not contain + * credstore password and provider path even if HIVE_JOB_CRESTORE_PASSWORD env is set + */ + @Test + public void testNoCredentialProviderWithPassword() throws Exception { + setupConfigs(false, false, true, false); + + Assert.assertTrue(StringUtils.isBlank(jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG))); + + Assert.assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + + Assert.assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + } + + /* + * If hive job credential provider is set but HIVE_JOB_CREDSTORE_PASSWORD is not set, use + * HADOOP_CREDSTORE_PASSWORD in the jobConf + */ + @Test + public void testJobCredentialProviderWithDefaultPassword() throws Exception { + setupConfigs(false, true, false, true); + + HiveConfUtil.updateJobCredentialProviders(jobConf); + Assert.assertEquals(JOB_CREDSTORE_LOCATION, + jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)); + + Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + + Assert.assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + } + + /* + * When neither HADOOP_CREDSTORE_PASSWORD nor HIVE_JOB_CREDSTORE_PASSWORD + * are not set jobConf should contain only the credential provider path + */ + @Test + public void testCredentialProviderWithNoPasswords() throws Exception { + setupConfigs(true, false, false, true); + + HiveConfUtil.updateJobCredentialProviders(jobConf); + Assert.assertEquals(JOB_CREDSTORE_LOCATION, + jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)); + Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV)); + Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV)); + + resetConfig(); + setupConfigs(true, false, false, false); + + HiveConfUtil.updateJobCredentialProviders(jobConf); + Assert.assertEquals(HADOOP_CREDSTORE_LOCATION, + jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)); + Assert.assertNull(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV)); + Assert.assertNull(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV)); + } + + /* + * default behavior when neither hive.job.credstore location is set nor + * HIVE_JOB_CREDSTORE_PASSWORD is. In this case if hadoop credential provider is configured job + * config should use that else it should remain unset + */ + @Test + public void testJobCredentialProviderUnset() throws Exception { + setupConfigs(true, true, false, false); + + HiveConfUtil.updateJobCredentialProviders(jobConf); + assertEquals(HADOOP_CREDSTORE_LOCATION, jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG)); + + assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + + assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, getValueFromJobConf( + jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + } + + /* + * Test the unsecure base case when neither hadoop nor job-specific + * credential provider is set + */ + @Test + public void testNoCredentialProvider() throws Exception { + setupConfigs(false, false, false, false); + + assertTrue(StringUtils.isBlank(jobConf.get(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG))); + + assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), + HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + + assertNull(getValueFromJobConf(jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV), + HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + } + + /* + * Test updateCredentialProviders does not corrupt existing values of + * Mapred env configs + */ + @Test + public void testExistingConfiguration() throws Exception { + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "k1=v1, k2=v2, HADOOP_CREDSTORE_PASSWORD=test"); + setupConfigs(false, true, false, true); + HiveConfUtil.updateJobCredentialProviders(jobConf); + + assertEquals("v1", getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1")); + assertEquals("v2", getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2")); + + resetConfig(); + + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "k1=v1, HADOOP_CREDSTORE_PASSWORD=test, k2=v2"); + setupConfigs(false, true, false, true); + HiveConfUtil.updateJobCredentialProviders(jobConf); + + assertEquals("v1", getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1")); + assertEquals("v2", getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2")); + + resetConfig(); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "HADOOP_CREDSTORE_PASSWORD=test, k1=v1, k2=v2"); + setupConfigs(false, true, false, true); + HiveConfUtil.updateJobCredentialProviders(jobConf); + + assertEquals("v1", getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k1")); + assertEquals("v2", getValueFromJobConf(jobConf.get(JobConf.MAPRED_MAP_TASK_ENV), "k2")); + } + + /** + * Sets up the environment and configurations + * + * @param setHadoopCredProvider set hadoop credstore provider path + * @param setHadoopCredstorePassword set HADOOP_CREDSTORE_PASSWORD env variable + * @param setHiveCredPassword set HIVE_JOB_CREDSTORE_PASSWORD env variable + * @param setHiveProviderPath set HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDSTORE_LOCATION in the + * hive config + * @throws Exception + */ + private void setupConfigs(boolean setHadoopCredProvider, boolean setHadoopCredstorePassword, + boolean setHiveCredPassword, boolean setHiveProviderPath) throws Exception { + Map<String, String> mockEnv = new HashMap<>(); + // sets the env variable HADOOP_CREDSTORE_PASSWORD to value defined by HADOOP_CREDSTORE_PASSWORD + // sets hadoop.security.credential.provider.path property to simulate default credential + // provider setup + if (setHadoopCredProvider) { + jobConf.set(HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG, HADOOP_CREDSTORE_LOCATION); + } + if (setHadoopCredstorePassword) { + mockEnv.put(HADOOP_CREDENTIAL_PASSWORD_ENVVAR, HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL); + } + // sets the env variable HIVE_JOB_CREDSTORE_PASSWORD to value defined by + // HIVE_JOB_CREDSTORE_PASSWORD + if (setHiveCredPassword) { + mockEnv.put(HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR, HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL); + } + TestHiveCredentialProviders.setEnv(mockEnv); + // set hive provider path in hiveConf if setHiveProviderPath is true + // simulates hive.server2.job.credstore.location property set in hive-site.xml/core-site.xml of + // HS2 + if (setHiveProviderPath) { + jobConf.set(HiveConf.ConfVars.HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH.varname, + JOB_CREDSTORE_LOCATION); + } + jobConf.set(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, "mr"); + } + + /* + * Extract value from a comma-separated key=value pairs + */ + private String getValueFromJobConf(String keyValuePairs, String key) { + if (keyValuePairs == null) { + return null; + } + String[] keyValues = keyValuePairs.split(","); + for (String kv : keyValues) { + String[] parts = kv.split("="); + if (key.equals(parts[0].trim())) { + return parts[1].trim(); + } + } + return null; + } + + /* + * Test if the environment variables can be set. If this test fails + * all the other tests will also fail because environment is not getting setup + */ + @Test + public void testEnv() throws Exception { + Map<String, String> mockEnv = new HashMap<>(); + mockEnv.put(HADOOP_CREDENTIAL_PASSWORD_ENVVAR, HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL); + mockEnv.put(HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR, HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL); + TestHiveCredentialProviders.setEnv(mockEnv); + assertEquals(HADOOP_CREDSTORE_PASSWORD_ENVVAR_VAL, System.getenv(HADOOP_CREDENTIAL_PASSWORD_ENVVAR)); + assertEquals(HIVE_JOB_CREDSTORE_PASSWORD_ENVVAR_VAL, System.getenv(HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/9caf2300/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 936fdaf..e2a30a7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -53,6 +53,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.shims.Utils; @@ -206,6 +207,7 @@ class SparkClientImpl implements SparkClient { if (conf.containsKey(SparkClientFactory.CONF_KEY_IN_PROCESS)) { // Mostly for testing things quickly. Do not do this in production. + // when invoked in-process it inherits the environment variables of the parent LOG.warn("!!!! Running remote driver in-process. !!!!"); runnable = new Runnable() { @Override @@ -440,7 +442,12 @@ class SparkClientImpl implements SparkClient { // Prevent hive configurations from being visible in Spark. pb.environment().remove("HIVE_HOME"); pb.environment().remove("HIVE_CONF_DIR"); - + // Add credential provider password to the child process's environment + // In case of Spark the credential provider location is provided in the jobConf when the job is submitted + String password = getSparkJobCredentialProviderPassword(); + if(password != null) { + pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password); + } if (isTesting != null) { pb.environment().put("SPARK_TESTING", isTesting); } @@ -485,6 +492,15 @@ class SparkClientImpl implements SparkClient { return thread; } + private String getSparkJobCredentialProviderPassword() { + if (conf.containsKey("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD")) { + return conf.get("spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD"); + } else if (conf.containsKey("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD")) { + return conf.get("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD"); + } + return null; + } + private void redirect(String name, Redirector redirector) { Thread thread = new Thread(redirector); thread.setName(name);
