Repository: incubator-gobblin Updated Branches: refs/heads/master 571746c2b -> 0afdc45c3
[GOBBLIN-332] Fetching Hive tokens in TokenUtils Closes #2184 from autumnust/hivefortokenutils Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0afdc45c Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0afdc45c Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0afdc45c Branch: refs/heads/master Commit: 0afdc45c39a80944d9d6bcb1ec9767749ae6dfef Parents: 571746c Author: Lei Sun <[email protected]> Authored: Wed Dec 6 08:59:40 2017 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Wed Dec 6 08:59:40 2017 -0800 ---------------------------------------------------------------------- .../gobblin/azkaban/AzkabanJobLauncher.java | 7 +- gobblin-utility/build.gradle | 1 + .../apache/gobblin/util/hadoop/TokenUtils.java | 159 +++++++++++++++++-- 3 files changed, 152 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0afdc45c/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java index 5c9fc1d..89c7646 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.azkaban; +import com.google.common.base.Optional; import java.io.File; import java.io.IOException; import java.net.URI; @@ -35,6 +36,7 @@ import java.util.concurrent.TimeoutException; import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator; import org.apache.gobblin.util.ConfigUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Credentials; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.joda.time.DateTime; @@ -171,7 +173,10 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch // see javadoc for more information LOG.info(String.format("Job type %s does not provide Hadoop tokens. Negotiating Hadoop tokens.", props.getProperty(JOB_TYPE))); - File tokenFile = TokenUtils.getHadoopTokens(new State(props)); + + File tokenFile = File.createTempFile("mr-azkaban", ".token"); + TokenUtils.getHadoopTokens(new State(props), Optional.of(tokenFile), new Credentials()); + System.setProperty(HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath()); System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath()); this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0afdc45c/gobblin-utility/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle index cd79bde..9d7505b 100644 --- a/gobblin-utility/build.gradle +++ b/gobblin-utility/build.gradle @@ -30,6 +30,7 @@ dependencies { compile externalDependency.guava compile externalDependency.slf4j compile externalDependency.avro + compile externalDependency.hiveMetastore compile externalDependency.jodaTime compile externalDependency.jacksonCore compile externalDependency.jasypt http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0afdc45c/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java index 5cb9723..fb45e14 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/hadoop/TokenUtils.java @@ -17,6 +17,9 @@ package org.apache.gobblin.util.hadoop; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; @@ -25,12 +28,15 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; +import java.util.Collections; import java.util.List; - import java.util.regex.Pattern; +import org.apache.gobblin.configuration.State; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -46,16 +52,12 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Logger; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -import org.apache.gobblin.configuration.State; +import org.apache.thrift.TException; /** @@ -83,14 +85,65 @@ public class TokenUtils { private static final String KERBEROS_REALM = "kerberos.realm"; /** + * the key that will be used to set proper signature for each of the hcat token when multiple hcat + * tokens are required to be fetched. + */ + private static final String HIVE_TOKEN_SIGNATURE_KEY = "hive.metastore.token.signature"; + /** + * User can specify the hcat location that they used specifically. It could contains addtional hcat location, + * comma-separated. + */ + private static final String USER_DEFINED_HIVE_LOCATIONS = "user.defined.hcatLocation"; + + /** + * Get Hadoop tokens (tokens for job history server, job tracker, hive and HDFS) using Kerberos keytab, + * on behalf on a proxy user, embed tokens into a {@link UserGroupInformation} as returned result, persist in-memory + * credentials if tokenFile specified + * + * Note that when a super-user is fetching tokens for other users, + * {@link #fetchHcatToken(String, HiveConf, String, IMetaStoreClient)} getDelegationToken} explicitly + * contains a string parameter indicating proxy user, while other hadoop services require impersonation first. + * + * @param state A {@link State} object that should contain properties. + * @param tokenFile If present, the file will store materialized credentials. + * @param ugi The {@link UserGroupInformation} that used to impersonate into the proxy user by a "doAs block". + * @param targetUser The user to be impersonated as, for fetching hadoop tokens. + * @return A {@link UserGroupInformation} containing negotiated credentials. + */ + public static UserGroupInformation getHadoopAndHiveTokensForProxyUser(final State state, Optional<File> tokenFile, + UserGroupInformation ugi, IMetaStoreClient client, String targetUser) throws IOException, InterruptedException { + final Credentials cred = new Credentials(); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + getHadoopTokens(state, Optional.absent(), cred); + return null; + } + }); + + ugi.getCredentials().addAll(cred); + // Will add hive tokens into ugi in this method. + getHiveToken(state, client, cred, targetUser, ugi); + + if (tokenFile.isPresent()){ + persistTokens(cred, tokenFile.get()); + } + // at this point, tokens in ugi can be more than that in Credential object, + // since hive token is not put in Credential object. + return ugi; + } + + /** * Get Hadoop tokens (tokens for job history server, job tracker and HDFS) using Kerberos keytab. * * @param state A {@link State} object that should contain property {@link #USER_TO_PROXY}, * {@link #KEYTAB_USER} and {@link #KEYTAB_LOCATION}. To obtain tokens for * other namenodes, use property {@link #OTHER_NAMENODES} with comma separated HDFS URIs. - * @return A {@link File} containing the negotiated credentials. + * @param tokenFile If present, the file will store materialized credentials. + * @param cred A im-memory representation of credentials. */ - public static File getHadoopTokens(final State state) throws IOException, InterruptedException { + public static void getHadoopTokens(final State state, Optional<File> tokenFile, Credentials cred) + throws IOException, InterruptedException { Preconditions.checkArgument(state.contains(KEYTAB_USER), "Missing required property " + KEYTAB_USER); Preconditions.checkArgument(state.contains(KEYTAB_LOCATION), "Missing required property " + KEYTAB_LOCATION); @@ -103,17 +156,15 @@ public class TokenUtils { final Optional<String> userToProxy = Strings.isNullOrEmpty(state.getProp(USER_TO_PROXY)) ? Optional.<String>absent() : Optional.fromNullable(state.getProp(USER_TO_PROXY)); final Configuration conf = new Configuration(); - final Credentials cred = new Credentials(); LOG.info("Getting tokens for " + userToProxy); getJhToken(conf, cred); getFsAndJtTokens(state, conf, userToProxy, cred); - File tokenFile = File.createTempFile("mr-azkaban", ".token"); - persistTokens(cred, tokenFile); - - return tokenFile; + if (tokenFile.isPresent()) { + persistTokens(cred, tokenFile.get()); + } } /** @@ -132,6 +183,86 @@ public class TokenUtils { } } + /** + * + * @param userToProxy The user that hiveClient is impersonating as to fetch the delegation tokens. + * @param ugi The {@link UserGroupInformation} that to be added with negotiated credentials. + */ + public static void getHiveToken(final State state, IMetaStoreClient hiveClient, Credentials cred, + final String userToProxy, UserGroupInformation ugi) { + try { + // Fetch and save the default hcat token. + LOG.info("Fetching default Hive MetaStore token from hive"); + HiveConf hiveConf = new HiveConf(); + + Token<DelegationTokenIdentifier> hcatToken = fetchHcatToken(userToProxy, hiveConf, null, hiveClient); + cred.addToken(hcatToken.getService(), hcatToken); + ugi.addToken(hcatToken); + + // Fetch extra Hcat location user specified. + final List<String> extraHcatLocations = + state.contains(USER_DEFINED_HIVE_LOCATIONS) ? state.getPropAsList(USER_DEFINED_HIVE_LOCATIONS) + : Collections.EMPTY_LIST; + if (!extraHcatLocations.isEmpty()) { + LOG.info("Need to fetch extra metaStore tokens from hive."); + + // start to process the user inputs. + for (final String thriftUrl : extraHcatLocations) { + LOG.info("Fetching metaStore token from : " + thriftUrl); + + hiveConf = new HiveConf(); + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUrl); + hcatToken = fetchHcatToken(userToProxy, hiveConf, thriftUrl, hiveClient); + cred.addToken(hcatToken.getService(), hcatToken); + ugi.addToken(hcatToken); + + LOG.info("Successfully fetched token for:" + thriftUrl); + } + } + } catch (final Throwable t) { + final String message = "Failed to get hive metastore token." + t.getMessage() + t.getCause(); + LOG.error(message, t); + throw new RuntimeException(message); + } + } + + /** + * function to fetch hcat token as per the specified hive configuration and then store the token + * in to the credential store specified . + * + * @param userToProxy String value indicating the name of the user the token will be fetched for. + * @param hiveConf the configuration based off which the hive client will be initialized. + */ + private static Token<DelegationTokenIdentifier> fetchHcatToken(final String userToProxy, final HiveConf hiveConf, + final String tokenSignatureOverwrite, final IMetaStoreClient hiveClient) + throws IOException, TException, InterruptedException { + + LOG.info(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname + ": " + hiveConf.get( + HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname)); + + LOG.info(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname + ": " + hiveConf.get( + HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname)); + + final Token<DelegationTokenIdentifier> hcatToken = new Token<>(); + + hcatToken.decodeFromUrlString( + hiveClient.getDelegationToken(userToProxy, UserGroupInformation.getLoginUser().getShortUserName())); + + // overwrite the value of the service property of the token if the signature + // override is specified. + // If the service field is set, do not overwrite that + if (hcatToken.getService().getLength() <= 0 && tokenSignatureOverwrite != null + && tokenSignatureOverwrite.trim().length() > 0) { + hcatToken.setService(new Text(tokenSignatureOverwrite.trim().toLowerCase())); + + LOG.info(HIVE_TOKEN_SIGNATURE_KEY + ":" + tokenSignatureOverwrite); + } + + LOG.info("Created hive metastore token for user:" + userToProxy + " with kind[" + hcatToken.getKind() + "]" + + " and service[" + hcatToken.getService() + "]"); + return hcatToken; + } + private static void getJhToken(Configuration conf, Credentials cred) throws IOException { YarnRPC rpc = YarnRPC.create(conf); final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
