Author: daijy Date: Fri Oct 18 22:42:59 2013 New Revision: 1533658 URL: http://svn.apache.org/r1533658 Log: HIVE-5133: webhcat jobs that need to access metastore fails in secure mode (Eugene Koifman via Daniel Dai)
Added: hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java Modified: hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java Added: hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig?rev=1533658&view=auto ============================================================================== --- hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig (added) +++ hive/trunk/hcatalog/src/test/e2e/templeton/inpdir/hcatloadstore.pig Fri Oct 18 22:42:59 2013 @@ -0,0 +1,21 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +l = load '$INPDIR/nums.txt' as (i:int, j:int); +store l into 'hcattest_pig' using org.apache.hive.hcatalog.pig.HCatStorer(); +s = load 'hcattest_pig' using org.apache.hive.hcatalog.pig.HCatLoader(); +store s into '$OUTDIR/loadstore.out'; Modified: hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf (original) +++ hive/trunk/hcatalog/src/test/e2e/templeton/tests/jobsubmission.conf Fri Oct 18 22:42:59 2013 @@ -234,7 +234,7 @@ $cfg = { #a simple load store script with log enabled - 'num' => 9, + 'num' => 10, 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/pig', 'post_options' => ['user.name=:UNAME:', 'arg=-p', 'arg=INPDIR=:INPDIR_HDFS:','arg=-p', 'arg=OUTDIR=:OUTDIR:', 'file=:INPDIR_HDFS:/loadstore.pig', @@ -249,7 +249,31 @@ $cfg = 'check_call_back' => 1, }, - #test 10 + { + #note: this test will fail unless Hive is installed in the default location Pig expects it in + #HIVE-5547 will address this limitation + 'num' => 11, + 'setup' => [ + { + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/ddl', + 'status_code' => 200, + 'post_options' => ['user.name=:UNAME:','exec=drop table if exists hcattest_pig; create table hcattest_pig(i int, j int) STORED AS textfile;'], + 'json_field_substr_match' => {'stderr' => 'OK'} + } + ], + 'method' => 'POST', + 'url' => ':TEMPLETON_URL:/templeton/v1/pig', + 'post_options' => ['user.name=:UNAME:', 'arg=-useHCatalog', 'arg=-p', 'arg=INPDIR=:INPDIR_HDFS:', 'arg=-p', 'arg= OUTDIR=:OUTDIR:', 'file=:INPDIR_HDFS:/hcatloadstore.pig'], + + 'json_field_substr_match' => { 'id' => '\d+'}, + 'status_code' => 200, + 'check_job_created' => 1, + 'check_job_complete' => 'SUCCESS', + 'check_job_exit_value' => 0, + 'check_call_back' => 1, + }, + #test 11 #TODO jython test @@ -420,7 +444,7 @@ $cfg = { #test add jar - 'num' => 9, + 'num' => 11, 'ignore23' => 'Log collector does not work with Hadoop 2', 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/hive', @@ -435,7 +459,7 @@ $cfg = }, { #test add jar when the jar is not shipped - 'num' => 10, + 'num' => 12, 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/hive', 'post_options' => ['user.name=:UNAME:','execute=add jar piggybank.jar',], @@ -449,7 +473,7 @@ $cfg = }, { #enable logs - 'num' => 11, + 'num' => 13, 'ignore23' => 'Log collector does not work with Hadoop 2', 'method' => 'POST', 'url' => ':TEMPLETON_URL:/templeton/v1/hive', Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/CompleteDelegator.java Fri Oct 18 22:42:59 2013 @@ -20,11 +20,15 @@ package org.apache.hive.hcatalog.templet import java.io.IOException; import java.net.URL; -import java.net.MalformedURLException; import java.util.Date; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.templeton.tool.DelegationTokenCache; import org.apache.hive.hcatalog.templeton.tool.JobState; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; @@ -39,11 +43,12 @@ import org.apache.hive.hcatalog.templeto * this at the same time. That should never happen. * * We use a Hadoop config var to notify this class on the completion - * of a job. Hadoop will call use multiple times in the event of + * of a job. Hadoop will call us multiple times in the event of * failure. Even if the failure is that the client callback failed. * * See LauncherDelegator for the HADOOP_END_RETRY* vars that are set. */ +@InterfaceAudience.Private public class CompleteDelegator extends TempletonDelegator { private static final Log LOG = LogFactory.getLog(CompleteDelegator.class); @@ -51,28 +56,36 @@ public class CompleteDelegator extends T super(appConf); } - public CompleteBean run(String id) + public CompleteBean run(String id, String jobStatus) throws CallbackFailedException, IOException { if (id == null) acceptWithError("No jobid given"); JobState state = null; + /* we don't want to cancel the delegation token if we think the callback is going to + to be retried, for example, because the job is not complete yet */ + boolean cancelMetastoreToken = false; try { state = new JobState(id, Main.getAppConfigInstance()); if (state.getCompleteStatus() == null) - failed("Job not yet complete. jobId=" + id, null); + failed("Job not yet complete. jobId=" + id + " Status from JT=" + jobStatus, null); Long notified = state.getNotifiedTime(); - if (notified != null) + if (notified != null) { + cancelMetastoreToken = true; return acceptWithError("Callback already run for jobId=" + id + " at " + new Date(notified)); + } String callback = state.getCallback(); - if (callback == null) + if (callback == null) { + cancelMetastoreToken = true; return new CompleteBean("No callback registered"); - + } + try { doCallback(state.getId(), callback); + cancelMetastoreToken = true; } catch (Exception e) { failed("Callback failed " + callback + " for " + id, e); } @@ -80,8 +93,26 @@ public class CompleteDelegator extends T state.setNotifiedTime(System.currentTimeMillis()); return new CompleteBean("Callback sent"); } finally { - if (state != null) - state.close(); + state.close(); + HiveMetaStoreClient client = null; + try { + if(cancelMetastoreToken) { + String metastoreTokenStrForm = + DelegationTokenCache.getStringFormTokenCache().getDelegationToken(id); + if(metastoreTokenStrForm != null) { + client = HCatUtil.getHiveClient(new HiveConf()); + client.cancelDelegationToken(metastoreTokenStrForm); + LOG.debug("Cancelled token for jobId=" + id + " status from JT=" + jobStatus); + DelegationTokenCache.getStringFormTokenCache().removeDelegationToken(id); + } + } + } + catch(Exception ex) { + LOG.warn("Failed to cancel metastore delegation token for jobId=" + id, ex); + } + finally { + HCatUtil.closeHiveClientQuietly(client); + } } } @@ -90,8 +121,7 @@ public class CompleteDelegator extends T * finished. If the url has the string $jobId in it, it will be * replaced with the completed jobid. */ - public static void doCallback(String jobid, String url) - throws MalformedURLException, IOException { + public static void doCallback(String jobid, String url) throws IOException { if (url.contains("$jobId")) url = url.replace("$jobId", jobid); TempletonUtils.fetchUrl(new URL(url)); Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/HiveDelegator.java Fri Oct 18 22:42:59 2013 @@ -65,6 +65,8 @@ public class HiveDelegator extends Launc args.addAll(makeBasicArgs(execute, srcFile, otherFiles, statusdir, completedUrl, enablelog)); args.add("--"); TempletonUtils.addCmdForWindows(args); + addHiveMetaStoreTokenArg(); + args.add(appConf.hivePath()); args.add("--service"); @@ -111,9 +113,10 @@ public class HiveDelegator extends Launc ArrayList<String> args = new ArrayList<String>(); ArrayList<String> allFiles = new ArrayList<String>(); - if (TempletonUtils.isset(srcFile)) + if (TempletonUtils.isset(srcFile)) { allFiles.add(TempletonUtils.hadoopFsFilename(srcFile, appConf, runAs)); + } if (TempletonUtils.isset(otherFiles)) { String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs); Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/JarDelegator.java Fri Oct 18 22:42:59 2013 @@ -42,14 +42,15 @@ public class JarDelegator extends Launch public EnqueueBean run(String user, Map<String, Object> userArgs, String jar, String mainClass, String libjars, String files, List<String> jarArgs, List<String> defines, - String statusdir, String callback, String completedUrl, + String statusdir, String callback, + boolean usehcatalog, String completedUrl, boolean enablelog, JobType jobType) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List<String> args = makeArgs(jar, mainClass, libjars, files, jarArgs, defines, - statusdir, completedUrl, enablelog, jobType); + statusdir, usehcatalog, completedUrl, enablelog, jobType); return enqueueController(user, userArgs, callback, args); } @@ -57,23 +58,30 @@ public class JarDelegator extends Launch private List<String> makeArgs(String jar, String mainClass, String libjars, String files, List<String> jarArgs, List<String> defines, - String statusdir, String completedUrl, + String statusdir, boolean usehcatalog, String completedUrl, boolean enablelog, JobType jobType) throws BadParam, IOException, InterruptedException { ArrayList<String> args = new ArrayList<String>(); try { - ArrayList<String> allFiles = new ArrayList(); + ArrayList<String> allFiles = new ArrayList<String>(); allFiles.add(TempletonUtils.hadoopFsFilename(jar, appConf, runAs)); args.addAll(makeLauncherArgs(appConf, statusdir, completedUrl, allFiles, enablelog, jobType)); args.add("--"); TempletonUtils.addCmdForWindows(args); + + //check if the rest command specified explicitly to use hcatalog + if(usehcatalog){ + addHiveMetaStoreTokenArg(); + } + args.add(appConf.clusterHadoop()); args.add("jar"); args.add(TempletonUtils.hadoopFsPath(jar, appConf, runAs).getName()); - if (TempletonUtils.isset(mainClass)) + if (TempletonUtils.isset(mainClass)) { args.add(mainClass); + } if (TempletonUtils.isset(libjars)) { String libjarsListAsString = TempletonUtils.hadoopFsListAsString(libjars, appConf, runAs); Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/LauncherDelegator.java Fri Oct 18 22:42:59 2013 @@ -24,10 +24,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.commons.exec.ExecuteException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; @@ -44,7 +44,8 @@ import org.apache.hive.hcatalog.templeto public class LauncherDelegator extends TempletonDelegator { private static final Log LOG = LogFactory.getLog(LauncherDelegator.class); protected String runAs = null; - static public enum JobType {JAR, STREAMING, PIG, HIVE}; + static public enum JobType {JAR, STREAMING, PIG, HIVE} + private boolean secureMeatastoreAccess = false; public LauncherDelegator(AppConfig appConf) { super(appConf); @@ -70,7 +71,7 @@ public class LauncherDelegator extends T */ public EnqueueBean enqueueController(String user, Map<String, Object> userArgs, String callback, List<String> args) - throws NotAuthorizedException, BusyException, ExecuteException, + throws NotAuthorizedException, BusyException, IOException, QueueException { try { UserGroupInformation ugi = UgiFactory.getUgi(user); @@ -82,9 +83,10 @@ public class LauncherDelegator extends T long elapsed = ((System.nanoTime() - startTime) / ((int) 1e6)); LOG.debug("queued job " + id + " in " + elapsed + " ms"); - if (id == null) + if (id == null) { throw new QueueException("Unable to get job id"); - + } + registerJob(id, user, callback, userArgs); return new EnqueueBean(id); @@ -95,16 +97,14 @@ public class LauncherDelegator extends T private String queueAsUser(UserGroupInformation ugi, final List<String> args) throws IOException, InterruptedException { - String id = ugi.doAs(new PrivilegedExceptionAction<String>() { + return ugi.doAs(new PrivilegedExceptionAction<String>() { public String run() throws Exception { String[] array = new String[args.size()]; - TempletonControllerJob ctrl = new TempletonControllerJob(); + TempletonControllerJob ctrl = new TempletonControllerJob(secureMeatastoreAccess); ToolRunner.run(ctrl, args.toArray(array)); return ctrl.getSubmittedId(); } }); - - return id; } public List<String> makeLauncherArgs(AppConfig appConf, String statusdir, @@ -182,8 +182,9 @@ public class LauncherDelegator extends T */ public static String makeOverrideClasspath(AppConfig appConf) { String[] overrides = appConf.overrideJars(); - if (overrides == null) + if (overrides == null) { return null; + } ArrayList<String> cp = new ArrayList<String>(); for (String fname : overrides) { @@ -204,5 +205,18 @@ public class LauncherDelegator extends T args.add(name + "=" + val); } } - + /** + * This is called by subclasses when they determined that the sumbmitted job requires + * metastore access (e.g. Pig job that uses HCatalog). This then determines if + * secure access is required and causes TempletonControllerJob to set up a delegation token. + * @see TempletonControllerJob + */ + void addHiveMetaStoreTokenArg() { + //in order for this to work hive-site.xml must be on the classpath + HiveConf hiveConf = new HiveConf(); + if(!hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL)) { + return; + } + secureMeatastoreAccess = true; + } } Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java Fri Oct 18 22:42:59 2013 @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import org.apache.commons.exec.ExecuteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob; import org.apache.hive.hcatalog.templeton.tool.TempletonUtils; @@ -36,6 +38,7 @@ import org.apache.hive.hcatalog.templeto * This is the backend of the pig web service. */ public class PigDelegator extends LauncherDelegator { + private static final Log LOG = LogFactory.getLog(PigDelegator.class); public PigDelegator(AppConfig appConf) { super(appConf); } @@ -43,27 +46,43 @@ public class PigDelegator extends Launch public EnqueueBean run(String user, Map<String, Object> userArgs, String execute, String srcFile, List<String> pigArgs, String otherFiles, - String statusdir, String callback, String completedUrl, boolean enablelog) + String statusdir, String callback, + boolean usehcatalog, String completedUrl, boolean enablelog) throws NotAuthorizedException, BadParam, BusyException, QueueException, ExecuteException, IOException, InterruptedException { runAs = user; List<String> args = makeArgs(execute, srcFile, pigArgs, - otherFiles, statusdir, completedUrl, enablelog); + otherFiles, statusdir, usehcatalog, completedUrl, enablelog); return enqueueController(user, userArgs, callback, args); } + /** + * @param execute pig query string to be executed + * @param srcFile pig query file to be executed + * @param pigArgs pig command line arguments + * @param otherFiles files to be copied to the map reduce cluster + * @param statusdir status dir location + * @param usehcatalog whether the command uses hcatalog/needs to connect + * to hive metastore server + * @param completedUrl call back url + * @return + * @throws BadParam + * @throws IOException + * @throws InterruptedException + */ private List<String> makeArgs(String execute, String srcFile, List<String> pigArgs, String otherFiles, - String statusdir, String completedUrl, boolean enablelog) + String statusdir, boolean usehcatalog, + String completedUrl, boolean enablelog) throws BadParam, IOException, InterruptedException { ArrayList<String> args = new ArrayList<String>(); try { ArrayList<String> allFiles = new ArrayList<String>(); - if (TempletonUtils.isset(srcFile)) - allFiles.add(TempletonUtils.hadoopFsFilename - (srcFile, appConf, runAs)); + if (TempletonUtils.isset(srcFile)) { + allFiles.add(TempletonUtils.hadoopFsFilename(srcFile, appConf, runAs)); + } if (TempletonUtils.isset(otherFiles)) { String[] ofs = TempletonUtils.hadoopFsListAsArray(otherFiles, appConf, runAs); allFiles.addAll(Arrays.asList(ofs)); @@ -85,6 +104,12 @@ public class PigDelegator extends Launch for (String pigArg : pigArgs) { args.add(TempletonUtils.quoteForWindows(pigArg)); } + //check if the REST command specified explicitly to use hcatalog + // or if it says that implicitly using the pig -useHCatalog arg + if(usehcatalog || hasPigArgUseHcat(pigArgs)){ + addHiveMetaStoreTokenArg(); + } + if (TempletonUtils.isset(execute)) { args.add("-execute"); args.add(TempletonUtils.quoteForWindows(execute)); @@ -101,4 +126,12 @@ public class PigDelegator extends Launch return args; } + + /** + * Check if the pig arguments has -useHCatalog set + * see http://hive.apache.org/docs/hcat_r0.5.0/loadstore.pdf + */ + private boolean hasPigArgUseHcat(List<String> pigArgs) { + return pigArgs.contains("-useHCatalog"); + } } Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java Fri Oct 18 22:42:59 2013 @@ -33,10 +33,10 @@ import org.apache.hadoop.hive.conf.HiveC import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.io.Text; -import org.apache.thrift.TException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.thrift.TException; /** * Helper class to run jobs using Kerberos security. Always safe to @@ -44,8 +44,8 @@ import org.apache.hadoop.security.token. */ public class SecureProxySupport { private Path tokenPath; - private final String HCAT_SERVICE = "hcat"; - private boolean isEnabled; + public static final String HCAT_SERVICE = "hcat"; + private final boolean isEnabled; private String user; public SecureProxySupport() { Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java Fri Oct 18 22:42:59 2013 @@ -186,8 +186,9 @@ public class Server { verifyDdlParam(db, ":db"); HcatDelegator d = new HcatDelegator(appConf, execService); - if (!TempletonUtils.isset(tablePattern)) + if (!TempletonUtils.isset(tablePattern)) { tablePattern = "*"; + } return d.listTables(getDoAsUser(), db, tablePattern); } @@ -252,10 +253,12 @@ public class Server { verifyDdlParam(table, ":table"); HcatDelegator d = new HcatDelegator(appConf, execService); - if ("extended".equals(format)) + if ("extended".equals(format)) { return d.descExtendedTable(getDoAsUser(), db, table); - else + } + else { return d.descTable(getDoAsUser(), db, table, false); + } } /** @@ -455,8 +458,9 @@ public class Server { verifyUser(); HcatDelegator d = new HcatDelegator(appConf, execService); - if (!TempletonUtils.isset(dbPattern)) + if (!TempletonUtils.isset(dbPattern)) { dbPattern = "*"; + } return d.listDatabases(getDoAsUser(), dbPattern); } @@ -508,8 +512,9 @@ public class Server { BadParam, ExecuteException, IOException { verifyUser(); verifyDdlParam(db, ":db"); - if (TempletonUtils.isset(option)) + if (TempletonUtils.isset(option)) { verifyDdlParam(option, "option"); + } HcatDelegator d = new HcatDelegator(appConf, execService); return d.dropDatabase(getDoAsUser(), db, ifExists, option, group, permissions); @@ -579,6 +584,7 @@ public class Server { /** * Run a MapReduce Streaming job. + * @param callback URL which WebHCat will call when the hive job finishes */ @POST @Path("mapreduce/streaming") @@ -628,6 +634,11 @@ public class Server { /** * Run a MapReduce Jar job. + * Params correspond to the REST api params + * @param usehcatalog if {@code true}, means the Jar uses HCat and thus needs to access + * metastore, which requires additional steps for WebHCat to perform in a secure cluster. + * @param callback URL which WebHCat will call when the hive job finishes + * @see org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob */ @POST @Path("mapreduce/jar") @@ -640,6 +651,7 @@ public class Server { @FormParam("define") List<String> defines, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, + @FormParam("usehcatalog") boolean usehcatalog, @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { @@ -665,11 +677,18 @@ public class Server { return d.run(getDoAsUser(), userArgs, jar, mainClass, libjars, files, args, defines, - statusdir, callback, getCompletedUrl(), enablelog, JobType.JAR); + statusdir, callback, usehcatalog, getCompletedUrl(), enablelog, JobType.JAR); } /** * Run a Pig job. + * Params correspond to the REST api params. If '-useHCatalog' is in the {@code pigArgs, usehcatalog}, + * is interpreted as true. + * @param usehcatalog if {@code true}, means the Pig script uses HCat and thus needs to access + * metastore, which requires additional steps for WebHCat to perform in a secure cluster. + * This does nothing to ensure that Pig is installed on target node in the cluster. + * @param callback URL which WebHCat will call when the hive job finishes + * @see org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob */ @POST @Path("pig") @@ -680,12 +699,14 @@ public class Server { @FormParam("files") String otherFiles, @FormParam("statusdir") String statusdir, @FormParam("callback") String callback, + @FormParam("usehcatalog") boolean usehcatalog, @FormParam("enablelog") boolean enablelog) throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); - if (execute == null && srcFile == null) + if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); + } //add all function arguments to a map Map<String, Object> userArgs = new HashMap<String, Object>(); @@ -704,7 +725,7 @@ public class Server { return d.run(getDoAsUser(), userArgs, execute, srcFile, pigArgs, otherFiles, - statusdir, callback, getCompletedUrl(), enablelog); + statusdir, callback, usehcatalog, getCompletedUrl(), enablelog); } /** @@ -719,7 +740,7 @@ public class Server { * used in "add jar" statement in hive script * @param defines shortcut for command line arguments "--define" * @param statusdir where the stderr/stdout of templeton controller job goes - * @param callback callback url when the hive job finishes + * @param callback URL which WebHCat will call when the hive job finishes * @param enablelog whether to collect mapreduce log into statusdir/logs */ @POST @@ -736,8 +757,9 @@ public class Server { throws NotAuthorizedException, BusyException, BadParam, QueueException, ExecuteException, IOException, InterruptedException { verifyUser(); - if (execute == null && srcFile == null) + if (execute == null && srcFile == null) { throw new BadParam("Either execute or file parameter required"); + } //add all function arguments to a map Map<String, Object> userArgs = new HashMap<String, Object>(); @@ -874,10 +896,12 @@ public class Server { @GET @Path("internal/complete/{jobid}") @Produces({MediaType.APPLICATION_JSON}) - public CompleteBean completeJob(@PathParam("jobid") String jobid) + public CompleteBean completeJob(@PathParam("jobid") String jobid, + @QueryParam("status") String jobStatus) throws CallbackFailedException, IOException { + LOG.debug("Received callback " + theUriInfo.getRequestUri()); CompleteDelegator d = new CompleteDelegator(appConf); - return d.run(jobid); + return d.run(jobid, jobStatus); } /** @@ -887,8 +911,9 @@ public class Server { String requestingUser = getRequestingUser(); if (requestingUser == null) { String msg = "No user found."; - if (!UserGroupInformation.isSecurityEnabled()) + if (!UserGroupInformation.isSecurityEnabled()) { msg += " Missing " + PseudoAuthenticator.USER_NAME + " parameter."; + } throw new NotAuthorizedException(msg); } if(doAs != null && !doAs.equals(requestingUser)) { @@ -897,9 +922,10 @@ public class Server { ProxyUserSupport.validate(requestingUser, getRequestingHost(requestingUser, request), doAs); } } + /** * All 'tasks' spawned by WebHCat should be run as this user. W/o doAs query parameter - * this is just the user making the request (or + * this is just the user making the request (or * {@link org.apache.hadoop.security.authentication.client.PseudoAuthenticator#USER_NAME} * query param). * @return value of doAs query parameter or {@link #getRequestingUser()} @@ -912,8 +938,9 @@ public class Server { */ public void verifyParam(String param, String name) throws BadParam { - if (param == null) + if (param == null) { throw new BadParam("Missing " + name + " parameter"); + } } /** @@ -921,8 +948,9 @@ public class Server { */ public void verifyParam(List<String> param, String name) throws BadParam { - if (param == null || param.isEmpty()) + if (param == null || param.isEmpty()) { throw new BadParam("Missing " + name + " parameter"); + } } public static final Pattern DDL_ID = Pattern.compile("[a-zA-Z]\\w*"); @@ -937,8 +965,9 @@ public class Server { throws BadParam { verifyParam(param, name); Matcher m = DDL_ID.matcher(param); - if (!m.matches()) + if (!m.matches()) { throw new BadParam("Invalid DDL identifier " + name); + } } /** * Get the user name from the security context, i.e. the user making the HTTP request. @@ -946,10 +975,12 @@ public class Server { * value of user.name query param, in kerberos mode it's the kinit'ed user. */ private String getRequestingUser() { - if (theSecurityContext == null) + if (theSecurityContext == null) { return null; - if (theSecurityContext.getUserPrincipal() == null) + } + if (theSecurityContext.getUserPrincipal() == null) { return null; + } //map hue/foo....@something.com->hue since user group checks // and config files are in terms of short name return UserGroupInformation.createRemoteUser( @@ -960,16 +991,18 @@ public class Server { * The callback url on this server when a task is completed. */ public String getCompletedUrl() { - if (theUriInfo == null) + if (theUriInfo == null) { return null; - if (theUriInfo.getBaseUri() == null) + } + if (theUriInfo.getBaseUri() == null) { return null; + } return theUriInfo.getBaseUri() + VERSION - + "/internal/complete/$jobId"; + + "/internal/complete/$jobId?status=$jobStatus"; } /** - * Returns canonical host name from which the request is made; used for doAs validation + * Returns canonical host name from which the request is made; used for doAs validation */ private static String getRequestingHost(String requestingUser, HttpServletRequest request) { final String unkHost = "???"; @@ -998,7 +1031,7 @@ public class Server { } private void checkEnableLogPrerequisite(boolean enablelog, String statusdir) throws BadParam { - if (enablelog == true && !TempletonUtils.isset(statusdir)) + if (enablelog && !TempletonUtils.isset(statusdir)) throw new BadParam("enablelog is only applicable when statusdir is set"); } } Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java Fri Oct 18 22:42:59 2013 @@ -58,7 +58,7 @@ public class StreamingDelegator extends return d.run(user, userArgs, appConf.streamingJar(), null, null, files, args, defines, - statusdir, callback, completedUrl, enableLog, jobType); + statusdir, callback, false, completedUrl, enableLog, jobType); } private List<String> makeArgs(List<String> inputs, Added: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java?rev=1533658&view=auto ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java (added) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/DelegationTokenCache.java Fri Oct 18 22:42:59 2013 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hive.hcatalog.templeton.tool; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; + +import java.util.concurrent.ConcurrentHashMap; + +/* + * Cache of delegation tokens. When {@link TempletonControllerJob} submits a job that requires + * metastore access and this access should be secure, TCJ will add a delegation token to the + * submitted job. When the job completes we need to cancel the token since by default the token + * lives for 7 days and over time can cause OOM (if not cancelled). Cancelling from + * TempletonControllerJob.LauchMapper mapper (via custom OutputCommitter for example) requires + * the jar containing HiveMetastoreClient (and any dependent jars) to be available on the node + * running LaunchMapper. Specifying transitive closure of the necessary jars is + * configuration/maintenance headache for each release. Caching the token means cancellation is + * done from WebHCat server and thus has Hive jars on the classpath. + * + * While it's possible that WebHCat crashes and looses this in-memory state, but this would be an + * exceptional condition and since tokens will automatically be cancelled after 7 days, + * the fact that this info is not persisted is OK. (Persisting it also complicates things + * because that needs to be done securely) + * @see TempletonControllerJob + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DelegationTokenCache<JobId, TokenObject> { + private ConcurrentHashMap<JobId, TokenObject> tokenCache = + new ConcurrentHashMap<JobId, TokenObject>(); + private static final DelegationTokenCache<String, String> stringFormTokenCache = + new DelegationTokenCache<String, String>(); + + /* + * Returns the singleton instance of jobId->delegation-token-in-string-form cache + */ + public static DelegationTokenCache<String, String> getStringFormTokenCache() { + return stringFormTokenCache; + } + TokenObject storeDelegationToken(JobId jobId, TokenObject token) { + return tokenCache.put(jobId, token); + } + public TokenObject getDelegationToken(JobId jobId) { + return tokenCache.get(jobId); + } + public void removeDelegationToken(JobId jobId) { + tokenCache.remove(jobId); + } +} Modified: hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java?rev=1533658&r1=1533657&r2=1533658&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java (original) +++ hive/trunk/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java Fri Oct 18 22:42:59 2013 @@ -25,6 +25,7 @@ import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintWriter; import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -41,6 +42,8 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; @@ -55,9 +58,11 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; import org.apache.hive.hcatalog.templeton.BadParam; import org.apache.hive.hcatalog.templeton.LauncherDelegator; +import org.apache.hive.hcatalog.templeton.SecureProxySupport; +import org.apache.hive.hcatalog.templeton.UgiFactory; +import org.apache.thrift.TException; /** * A Map Reduce job that will start another job. @@ -70,6 +75,13 @@ import org.apache.hive.hcatalog.templeto * - run a keep alive thread so the job doesn't end. * - Optionally, store the stdout, stderr, and exit value of the child * in hdfs files. + * + * A note on security. When jobs are submitted through WebHCat that use HCatalog, it means that + * metastore access is required. Hive queries, of course, need metastore access. This in turn + * requires delegation token to be obtained for metastore in a <em>secure cluster</em>. Since we + * can't usually parse the job to find out if it is using metastore, we require 'usehcatalog' + * parameter supplied in the REST call. WebHcat takes care of cancelling the token when the job + * is complete. */ public class TempletonControllerJob extends Configured implements Tool { public static final String COPY_NAME = "templeton.copy"; @@ -89,12 +101,19 @@ public class TempletonControllerJob exte public static final String TOKEN_FILE_ARG_PLACEHOLDER = "__WEBHCAT_TOKEN_FILE_LOCATION__"; - private static TrivialExecService execService = TrivialExecService.getInstance(); private static final Log LOG = LogFactory.getLog(TempletonControllerJob.class); + private final boolean secureMetastoreAccess; - + /** + * @param secureMetastoreAccess - if true, a delegation token will be created + * and added to the job + */ + public TempletonControllerJob(boolean secureMetastoreAccess) { + super(); + this.secureMetastoreAccess = secureMetastoreAccess; + } public static class LaunchMapper extends Mapper<NullWritable, NullWritable, Text, Text> { protected Process startJob(Context context, String user, @@ -194,8 +213,9 @@ public class TempletonControllerJob exte proc.waitFor(); keepAlive.sendReport = false; pool.shutdown(); - if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) + if (!pool.awaitTermination(WATCHER_TIMEOUT_SECS, TimeUnit.SECONDS)) { pool.shutdownNow(); + } writeExitValue(conf, proc.exitValue(), statusdir); JobState state = new JobState(context.getJobID().toString(), conf); @@ -210,11 +230,13 @@ public class TempletonControllerJob exte logRetriever.run(); } - if (proc.exitValue() != 0) + if (proc.exitValue() != 0) { System.err.println("templeton: job failed with exit code " + proc.exitValue()); - else + } + else { System.err.println("templeton: job completed with exit code 0"); + } } private void executeWatcher(ExecutorService pool, Configuration conf, @@ -248,10 +270,10 @@ public class TempletonControllerJob exte } private static class Watcher implements Runnable { - private InputStream in; + private final InputStream in; private OutputStream out; - private JobID jobid; - private Configuration conf; + private final JobID jobid; + private final Configuration conf; public Watcher(Configuration conf, JobID jobid, InputStream in, String statusdir, String name) @@ -341,21 +363,26 @@ public class TempletonControllerJob exte private JobID submittedJobId; public String getSubmittedId() { - if (submittedJobId == null) + if (submittedJobId == null) { return null; - else + } + else { return submittedJobId.toString(); + } } /** * Enqueue the job and print out the job id for later collection. + * @see org.apache.hive.hcatalog.templeton.CompleteDelegator */ @Override public int run(String[] args) - throws IOException, InterruptedException, ClassNotFoundException { + throws IOException, InterruptedException, ClassNotFoundException, TException { Configuration conf = getConf(); + conf.set(JAR_ARGS_NAME, TempletonUtils.encodeArray(args)); - conf.set("user.name", UserGroupInformation.getCurrentUser().getShortUserName()); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + conf.set("user.name", user); Job job = new Job(conf); job.setJarByClass(TempletonControllerJob.class); job.setJobName("TempletonControllerJob"); @@ -363,8 +390,7 @@ public class TempletonControllerJob exte job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(SingleInputFormat.class); - NullOutputFormat<NullWritable, NullWritable> of - = new NullOutputFormat<NullWritable, NullWritable>(); + NullOutputFormat<NullWritable, NullWritable> of = new NullOutputFormat<NullWritable, NullWritable>(); job.setOutputFormatClass(of.getClass()); job.setNumReduceTasks(0); @@ -372,18 +398,51 @@ public class TempletonControllerJob exte Token<DelegationTokenIdentifier> mrdt = jc.getDelegationToken(new Text("mr token")); job.getCredentials().addToken(new Text("mr token"), mrdt); + + String metastoreTokenStrForm = addHMSToken(job, user); + job.submit(); submittedJobId = job.getJobID(); + if(metastoreTokenStrForm != null) { + //so that it can be cancelled later from CompleteDelegator + DelegationTokenCache.getStringFormTokenCache().storeDelegationToken( + submittedJobId.toString(), metastoreTokenStrForm); + LOG.debug("Added metastore delegation token for jobId=" + submittedJobId.toString() + " " + + "user=" + user); + } return 0; } - - - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new TempletonControllerJob(), args); - if (ret != 0) - System.err.println("TempletonControllerJob failed!"); - System.exit(ret); + private String addHMSToken(Job job, String user) throws IOException, InterruptedException, + TException { + if(!secureMetastoreAccess) { + return null; + } + Token<org.apache.hadoop.hive.thrift.DelegationTokenIdentifier> hiveToken = + new Token<org.apache.hadoop.hive.thrift.DelegationTokenIdentifier>(); + String metastoreTokenStrForm = buildHcatDelegationToken(user); + hiveToken.decodeFromUrlString(metastoreTokenStrForm); + job.getCredentials().addToken(new + Text(SecureProxySupport.HCAT_SERVICE), hiveToken); + return metastoreTokenStrForm; + } + private String buildHcatDelegationToken(String user) throws IOException, InterruptedException, + TException { + final HiveConf c = new HiveConf(); + LOG.debug("Creating hive metastore delegation token for user " + user); + final UserGroupInformation ugi = UgiFactory.getUgi(user); + UserGroupInformation real = ugi.getRealUser(); + return real.doAs(new PrivilegedExceptionAction<String>() { + public String run() throws IOException, TException, InterruptedException { + final HiveMetaStoreClient client = new HiveMetaStoreClient(c); + return ugi.doAs(new PrivilegedExceptionAction<String>() { + public String run() throws IOException, TException, InterruptedException { + String u = ugi.getUserName(); + return client.getDelegationToken(u); + } + }); + } + }); } }