mapreduce: support for running on secure clusters This adds the appropriate hooks to grab authentication credentials at job submission time and add them to the job's Credentials object as a Hadoop "Token". The tasks then grab the Token and import it into the client they create before using it.
It's not possible to test this since we don't have support for running Kerberized Yarn clusters in the MiniCluster environment. I tested manually on a secure cluster using ImportTsv, ITBLL, and RowCounter jobs. Change-Id: Ieed43b9c8646aaee549078a26850e7e7bdecd802 Reviewed-on: http://gerrit.cloudera.org:8080/6237 Tested-by: Kudu Jenkins Reviewed-by: Jean-Daniel Cryans <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e9dfbe1e Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e9dfbe1e Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e9dfbe1e Branch: refs/heads/master Commit: e9dfbe1e5dd38d4f77ab79537e7e5c6d30e56a1d Parents: 2f94fb6 Author: Todd Lipcon <[email protected]> Authored: Wed Mar 1 22:38:39 2017 -0800 Committer: Todd Lipcon <[email protected]> Committed: Fri Mar 3 05:40:45 2017 +0000 ---------------------------------------------------------------------- .../tools/IntegrationTestBigLinkedList.java | 2 + .../org/apache/kudu/client/AsyncKuduClient.java | 9 ++ .../java/org/apache/kudu/client/KuduClient.java | 8 ++ .../kudu/mapreduce/CommandLineParser.java | 5 +- .../kudu/mapreduce/KuduTableInputFormat.java | 1 + .../kudu/mapreduce/KuduTableMapReduceUtil.java | 104 +++++++++++++++++++ .../kudu/mapreduce/KuduTableOutputFormat.java | 1 + 7 files changed, 128 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/e9dfbe1e/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java ---------------------------------------------------------------------- diff --git a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java index 19b2ba1..6171026 100644 --- a/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java +++ b/java/kudu-client-tools/src/main/java/org/apache/kudu/mapreduce/tools/IntegrationTestBigLinkedList.java @@ -774,7 +774,9 @@ public class IntegrationTestBigLinkedList extends Configured implements Tool { // Lack of YARN-445 means we can't auto-jstack on timeout, so disabling the timeout gives // us a chance to do it manually. job.getConfiguration().setInt("mapreduce.task.timeout", 0); + KuduTableMapReduceUtil.addDependencyJars(job); + KuduTableMapReduceUtil.addCredentialsToJob(client, job); boolean success = job.waitForCompletion(true); http://git-wip-us.apache.org/repos/asf/kudu/blob/e9dfbe1e/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java index acda901..6639284 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java @@ -48,6 +48,7 @@ import javax.annotation.concurrent.GuardedBy; import javax.security.auth.Subject; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.HostAndPort; @@ -579,6 +580,14 @@ public class AsyncKuduClient implements AutoCloseable { } /** + * @return the list of master addresses, stringified using commas to separate + * them + */ + public String getMasterAddressesAsString() { + return Joiner.on(",").join(masterAddresses); + } + + /** * Check if statistics collection is enabled for this client. * @return true if it is enabled, else false */ http://git-wip-us.apache.org/repos/asf/kudu/blob/e9dfbe1e/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java index 371d196..b23e5c2 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java @@ -298,6 +298,14 @@ public class KuduClient implements AutoCloseable { return asyncClient.getDefaultAdminOperationTimeoutMs(); } + /** + * @return the list of master addresses, stringified using commas to separate + * them + */ + public String getMasterAddressesAsString() { + return asyncClient.getMasterAddressesAsString(); + } + // Helper method to handle joining and transforming the Exception we receive. static <R> R joinAndHandleException(Deferred<R> deferred) throws KuduException { try { http://git-wip-us.apache.org/repos/asf/kudu/blob/e9dfbe1e/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java index 830cb7b..5b701ed 100644 --- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java +++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/CommandLineParser.java @@ -18,7 +18,6 @@ package org.apache.kudu.mapreduce; import org.apache.hadoop.conf.Configuration; - import org.apache.kudu.annotations.InterfaceAudience; import org.apache.kudu.annotations.InterfaceStability; import org.apache.kudu.client.AsyncKuduClient; @@ -118,11 +117,13 @@ public class CommandLineParser { * @return a kudu client */ public KuduClient getClient() { - return new KuduClient.KuduClientBuilder(getMasterAddresses()) + KuduClient c = new KuduClient.KuduClientBuilder(getMasterAddresses()) .defaultOperationTimeoutMs(getOperationTimeoutMs()) .defaultAdminOperationTimeoutMs(getAdminOperationTimeoutMs()) .defaultSocketReadTimeoutMs(getSocketReadTimeoutMs()) .build(); + KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(c); + return c; } /** http://git-wip-us.apache.org/repos/asf/kudu/blob/e9dfbe1e/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java index 91f4825..8f98170 100644 --- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java +++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java @@ -211,6 +211,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult> this.client = new KuduClient.KuduClientBuilder(masterAddresses) .defaultOperationTimeoutMs(operationTimeoutMs) .build(); + KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client); this.nameServer = conf.get(NAME_SERVER_KEY); this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false); http://git-wip-us.apache.org/repos/asf/kudu/blob/e9dfbe1e/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java index 6488163..570b464 100644 --- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java +++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableMapReduceUtil.java @@ -19,6 +19,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URL; import java.net.URLDecoder; +import java.security.AccessController; import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; @@ -29,6 +30,8 @@ import java.util.Set; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import javax.security.auth.Subject; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.net.util.Base64; @@ -36,18 +39,26 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.apache.kudu.annotations.InterfaceAudience; import org.apache.kudu.annotations.InterfaceStability; import org.apache.kudu.client.AsyncKuduClient; import org.apache.kudu.client.ColumnRangePredicate; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduPredicate; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; +import com.google.common.base.Preconditions; + /** * Utility class to setup MR jobs that use Kudu as an input and/or output. */ @@ -59,6 +70,14 @@ public class KuduTableMapReduceUtil { private static final Log LOG = LogFactory.getLog(KuduTableMapReduceUtil.class); /** + * "Secret key alias" used in Job Credentials to store the Kudu authentication + * credentials. This acts as a key in a Hadoop Credentials object. + */ + private static final Text AUTHN_CREDENTIALS_ALIAS = new Text("kudu.authn.credentials"); + + private static final Text KUDU_TOKEN_KIND = new Text("kudu-authn-data"); + + /** * Doesn't need instantiation */ private KuduTableMapReduceUtil() { } @@ -96,6 +115,19 @@ public class KuduTableMapReduceUtil { } /** + * Add credentials to the job so that tasks run as the user that submitted + * the job. + */ + protected void addCredentialsToJob(String masterAddresses, long operationTimeoutMs) + throws KuduException { + try (KuduClient client = new KuduClient.KuduClientBuilder(masterAddresses) + .defaultOperationTimeoutMs(operationTimeoutMs) + .build()) { + KuduTableMapReduceUtil.addCredentialsToJob(client, job); + } + } + + /** * Configures the job using the passed parameters. * @throws IOException If addDependencies is enabled and a problem is encountered reading * files on the filesystem @@ -137,6 +169,7 @@ public class KuduTableMapReduceUtil { if (addDependencies) { addDependencyJars(job); } + addCredentialsToJob(masterAddresses, operationTimeoutMs); } } @@ -202,6 +235,8 @@ public class KuduTableMapReduceUtil { if (addDependencies) { addDependencyJars(job); } + + addCredentialsToJob(masterAddresses, operationTimeoutMs); } } @@ -363,6 +398,75 @@ public class KuduTableMapReduceUtil { } /** + * Export the credentials from a {@link KuduClient} and store them in the given MapReduce + * {@link Job} so that {@link KuduClient}s created from within tasks of that job can + * authenticate to Kudu. + * + * This must be used before submitting a job when running against a Kudu cluster + * configured to require authentication. If using {@link TableInputFormatConfigurator}, + * {@link TableOutputFormatConfigurator} or another such utility class, this is called + * automatically and does not need to be called. + * + * @param client the client whose credentials to export + * @param job the job to configure + * @throws KuduException if credentials cannot be exported + */ + public static void addCredentialsToJob(KuduClient client, Job job) + throws KuduException { + Preconditions.checkNotNull(client); + Preconditions.checkNotNull(job); + + byte[] authnCreds = client.exportAuthenticationCredentials(); + Text service = new Text(client.getMasterAddressesAsString()); + job.getCredentials().addToken(AUTHN_CREDENTIALS_ALIAS, + new Token<TokenIdentifier>(null, authnCreds, KUDU_TOKEN_KIND, service)); + } + + /** + * Import credentials from the current thread's JAAS {@link Subject} into the provided + * {@link KuduClient}. + * + * This must be called for any clients created within a MapReduce job in order to + * adopt the credentials added by {@link #addCredentialsToJob(KuduClient, Job)}. + * When using {@link KuduTableInputFormat} or {@link KuduTableOutputFormat}, the + * implementation automatically handles creating the client and importing necessary + * credentials. As such, this is only necessary in jobs that explicitly create a + * {@link KuduClient}. + * + * If no appropriate credentials are found, does nothing. + */ + public static void importCredentialsFromCurrentSubject(KuduClient client) { + Subject subj = Subject.getSubject(AccessController.getContext()); + if (subj == null) { + return; + } + Text service = new Text(client.getMasterAddressesAsString()); + // Find the Hadoop credentials stored within the JAAS subject. + Set<Credentials> credSet = subj.getPrivateCredentials(Credentials.class); + if (credSet == null) { + return; + } + for (Credentials creds : credSet) { + for (Token<?> tok : creds.getAllTokens()) { + if (!tok.getKind().equals(KUDU_TOKEN_KIND)) { + continue; + } + // Only import credentials relevant to the service corresponding to + // 'client'. This is necessary if we want to support a job which + // reads from one cluster and writes to another. + if (!tok.getService().equals(service)) { + LOG.debug("Not importing credentials for service " + service + + "(expecting service " + service + ")"); + continue; + } + LOG.debug("Importing credentials for service " + service); + client.importAuthenticationCredentials(tok.getPassword()); + return; + } + } + } + + /** * Add the Kudu dependency jars as well as jars for any of the configured * job classes to the job configuration, so that JobClient will ship them * to the cluster and add them to the DistributedCache. http://git-wip-us.apache.org/repos/asf/kudu/blob/e9dfbe1e/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java index bb64a1e..1a93a36 100644 --- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java +++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableOutputFormat.java @@ -119,6 +119,7 @@ public class KuduTableOutputFormat extends OutputFormat<NullWritable,Operation> this.client = new KuduClient.KuduClientBuilder(masterAddress) .defaultOperationTimeoutMs(operationTimeoutMs) .build(); + KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client); try { this.table = client.openTable(tableName); } catch (Exception ex) {
