HIVE-10857 : Accumulo storage handler fail throwing java.lang.IllegalArgumentException (Josh Elser via Sushanth Sowmyan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb157fb8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb157fb8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb157fb8 Branch: refs/heads/llap Commit: bb157fb8778d60ce6b375012b5699a196c49b2a4 Parents: 735ba0d Author: Sushanth Sowmyan <[email protected]> Authored: Tue Jun 9 12:37:16 2015 -0700 Committer: Sushanth Sowmyan <[email protected]> Committed: Tue Jun 9 12:38:06 2015 -0700 ---------------------------------------------------------------------- .../accumulo/AccumuloConnectionParameters.java | 111 +++++++- .../hive/accumulo/AccumuloStorageHandler.java | 50 ++++ .../hive/accumulo/HiveAccumuloHelper.java | 280 +++++++++++++++++++ .../mr/HiveAccumuloTableInputFormat.java | 74 ++++- .../mr/HiveAccumuloTableOutputFormat.java | 63 ++++- .../TestAccumuloConnectionParameters.java | 19 ++ .../hive/accumulo/TestHiveAccumuloHelper.java | 75 +++++ .../mr/TestHiveAccumuloTableInputFormat.java | 8 +- .../mr/TestHiveAccumuloTableOutputFormat.java | 2 +- 9 files changed, 659 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java index 2b11f84..f34e820 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloConnectionParameters.java @@ -16,14 +16,20 @@ */ package org.apache.hadoop.hive.accumulo; +import java.io.File; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; import com.google.common.base.Preconditions; @@ -31,12 +37,18 @@ import com.google.common.base.Preconditions; * */ public class AccumuloConnectionParameters { + private static final String KERBEROS_TOKEN_CLASS = "org.apache.accumulo.core.client.security.tokens.KerberosToken"; + public static final String USER_NAME = "accumulo.user.name"; public static final String USER_PASS = "accumulo.user.pass"; public static final String ZOOKEEPERS = "accumulo.zookeepers"; public static final String INSTANCE_NAME = "accumulo.instance.name"; public static final String TABLE_NAME = "accumulo.table.name"; + // SASL/Kerberos properties + public static final String SASL_ENABLED = "accumulo.sasl.enabled"; + public static final String USER_KEYTAB = "accumulo.user.keytab"; + public static final String USE_MOCK_INSTANCE = "accumulo.mock.instance"; protected Configuration conf; @@ -84,6 +96,16 @@ public class AccumuloConnectionParameters { return conf.getBoolean(USE_MOCK_INSTANCE, false); } + public boolean useSasl() { + Preconditions.checkNotNull(conf); + return conf.getBoolean(SASL_ENABLED, false); + } + + public String getAccumuloKeytab() { + Preconditions.checkNotNull(conf); + return conf.get(USER_KEYTAB); + } + public Instance getInstance() { String instanceName = getAccumuloInstanceName(); @@ -112,16 +134,97 @@ public class AccumuloConnectionParameters { } public Connector getConnector(Instance inst) throws AccumuloException, AccumuloSecurityException { - String username = getAccumuloUserName(), password = getAccumuloPassword(); + String username = getAccumuloUserName(); // Fail with a good message if (null == username) { throw new IllegalArgumentException("Accumulo user name must be provided in hiveconf using " + USER_NAME); } - if (null == password) { - throw new IllegalArgumentException("Accumulo password must be provided in hiveconf using " + USER_PASS); + + if (useSasl()) { + return inst.getConnector(username, getKerberosToken()); + } else { + // Not using SASL/Kerberos -- use the password + String password = getAccumuloPassword(); + + if (null == password) { + throw new IllegalArgumentException("Accumulo password must be provided in hiveconf using " + USER_PASS); + } + + return inst.getConnector(username, new PasswordToken(password)); + } + } + + public AuthenticationToken getKerberosToken() { + if (!useSasl()) { + throw new IllegalArgumentException("Cannot construct KerberosToken when SASL is disabled"); + } + + final String keytab = getAccumuloKeytab(), username = getAccumuloUserName(); + + if (null != keytab) { + // Use the keytab if one was provided + return getKerberosToken(username, keytab); + } else { + // Otherwise, expect the user is already logged in + return getKerberosToken(username); + } + } + + /** + * Instantiate a KerberosToken in a backwards compatible manner. + * @param username Kerberos principal + */ + AuthenticationToken getKerberosToken(String username) { + // Get the Class + Class<? extends AuthenticationToken> krbTokenClz = getKerberosTokenClass(); + + try { + // Invoke the `new KerberosToken(String)` constructor + // Expects that the user is already logged-in + Constructor<? extends AuthenticationToken> constructor = krbTokenClz.getConstructor(String.class); + return constructor.newInstance(username); + } catch (NoSuchMethodException | SecurityException | InstantiationException | + IllegalArgumentException | InvocationTargetException | IllegalAccessException e) { + throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e); + } + } + + /** + * Instantiate a KerberosToken in a backwards compatible manner. + * @param username Kerberos principal + * @param keytab Keytab on local filesystem + */ + AuthenticationToken getKerberosToken(String username, String keytab) { + Class<? extends AuthenticationToken> krbTokenClz = getKerberosTokenClass(); + + File keytabFile = new File(keytab); + if (!keytabFile.isFile() || !keytabFile.canRead()) { + throw new IllegalArgumentException("Keytab must be a readable file: " + keytab); } - return inst.getConnector(username, new PasswordToken(password)); + try { + // Invoke the `new KerberosToken(String, File, boolean)` constructor + // Tries to log in as the provided user with the given keytab, overriding an already logged-in user if present + Constructor<? extends AuthenticationToken> constructor = krbTokenClz.getConstructor(String.class, File.class, boolean.class); + return constructor.newInstance(username, keytabFile, true); + } catch (NoSuchMethodException | SecurityException | InstantiationException | + IllegalArgumentException | InvocationTargetException | IllegalAccessException e) { + throw new IllegalArgumentException("Failed to instantiate KerberosToken.", e); + } + } + + /** + * Attempt to instantiate the KerberosToken class + */ + Class<? extends AuthenticationToken> getKerberosTokenClass() { + try { + // Instantiate the class + Class<?> clz = JavaUtils.loadClass(KERBEROS_TOKEN_CLASS); + // Cast it to an AuthenticationToken since Connector will need that + return clz.asSubclass(AuthenticationToken.class); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException("Could not load KerberosToken class. >=Accumulo 1.7.0 required", e); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java index 64eb18b..41a65ce 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/AccumuloStorageHandler.java @@ -27,6 +27,11 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; +import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.fate.Fate; import org.apache.accumulo.start.Main; import org.apache.accumulo.trace.instrument.Tracer; @@ -53,6 +58,9 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -69,6 +77,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance(); protected AccumuloConnectionParameters connectionParams; protected Configuration conf; + protected HiveAccumuloHelper helper = new HiveAccumuloHelper(); /** * Push down table properties into the JobConf. @@ -314,6 +323,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv // do nothing } + @SuppressWarnings("deprecation") @Override public DecomposedPredicate decomposePredicate(JobConf conf, Deserializer deserializer, ExprNodeDesc desc) { @@ -331,6 +341,7 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv } } + @SuppressWarnings("deprecation") @Override public void configureJobConf(TableDesc tableDesc, JobConf jobConf) { try { @@ -354,5 +365,44 @@ public class AccumuloStorageHandler extends DefaultStorageHandler implements Hiv } catch (IOException e) { log.error("Could not add necessary dependencies for " + serDeParams.getRowIdFactory().getClass(), e); } + + // When Kerberos is enabled, we have to add the Accumulo delegation token to the + // Job so that it gets passed down to the YARN/Tez task. + if (connectionParams.useSasl()) { + try { + // Obtain a delegation token from Accumulo + Connector conn = connectionParams.getConnector(); + AuthenticationToken token = helper.getDelegationToken(conn); + + // Make sure the Accumulo token is set in the Configuration (only a stub of the Accumulo + // AuthentiationToken is serialized, not the entire token). configureJobConf may be + // called multiple times with the same JobConf which results in an error from Accumulo + // MapReduce API. Catch the error, log a debug message and just keep going + try { + InputConfigurator.setConnectorInfo(AccumuloInputFormat.class, jobConf, + connectionParams.getAccumuloUserName(), token); + } catch (IllegalStateException e) { + // The implementation balks when this method is invoked multiple times + log.debug("Ignoring IllegalArgumentException about re-setting connector information"); + } + try { + OutputConfigurator.setConnectorInfo(AccumuloOutputFormat.class, jobConf, + connectionParams.getAccumuloUserName(), token); + } catch (IllegalStateException e) { + // The implementation balks when this method is invoked multiple times + log.debug("Ignoring IllegalArgumentException about re-setting connector information"); + } + + // Convert the Accumulo token in a Hadoop token + Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token); + + log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + + // Add the Hadoop token to the JobConf + helper.mergeTokenIntoJobConf(jobConf, accumuloToken); + } catch (Exception e) { + throw new RuntimeException("Failed to obtain DelegationToken for " + connectionParams.getAccumuloUserName(), e); + } + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java new file mode 100644 index 0000000..dfc5d03 --- /dev/null +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/HiveAccumuloHelper.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Collection; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class to hold common methods across the InputFormat, OutputFormat and StorageHandler. + */ +public class HiveAccumuloHelper { + private static final Logger log = LoggerFactory.getLogger(HiveAccumuloHelper.class); + // Constant from Accumulo's DelegationTokenImpl + public static final Text ACCUMULO_SERVICE = new Text("ACCUMULO_AUTH_TOKEN"); + + // Constants for DelegationToken reflection to continue to support 1.6 + private static final String DELEGATION_TOKEN_CONFIG_CLASS_NAME = + "org.apache.accumulo.core.client.admin.DelegationTokenConfig"; + private static final String DELEGATION_TOKEN_IMPL_CLASS_NAME = + "org.apache.accumulo.core.client.impl.DelegationTokenImpl"; + private static final String GET_DELEGATION_TOKEN_METHOD_NAME = "getDelegationToken"; + private static final String GET_IDENTIFIER_METHOD_NAME = "getIdentifier"; + private static final String GET_PASSWORD_METHOD_NAME = "getPassword"; + private static final String GET_SERVICE_NAME_METHOD_NAME = "getServiceName"; + + // Constants for ClientConfiguration and setZooKeeperInstance reflection + // to continue to support 1.5 + private static final String CLIENT_CONFIGURATION_CLASS_NAME = + "org.apache.accumulo.core.client.ClientConfiguration"; + private static final String LOAD_DEFAULT_METHOD_NAME = "loadDefault"; + private static final String SET_PROPERTY_METHOD_NAME = "setProperty"; + private static final String INSTANCE_ZOOKEEPER_HOST = "instance.zookeeper.host"; + private static final String INSTANCE_NAME = "instance.name"; + private static final String INSTANCE_RPC_SASL_ENABLED = "instance.rpc.sasl.enabled"; + private static final String SET_ZOOKEEPER_INSTANCE_METHOD_NAME = "setZooKeeperInstance"; + + // Constants for unwrapping the DelegationTokenStub into a DelegationTokenImpl + private static final String CONFIGURATOR_BASE_CLASS_NAME = + "org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase"; + private static final String UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME = "unwrapAuthenticationToken"; + + /** + * Extract the appropriate Token for Accumulo from the provided {@code user} and add it to the + * {@link JobConf}'s credentials. + * + * @param user + * User containing tokens + * @param jobConf + * The configuration for the job + * @throws IOException + * If the correct token is not found or the Token fails to be merged with the + * configuration + */ + public void addTokenFromUserToJobConf(UserGroupInformation user, JobConf jobConf) + throws IOException { + checkNotNull(user, "Provided UGI was null"); + checkNotNull(jobConf, "JobConf was null"); + + // Accumulo token already in Configuration, but the Token isn't in the Job credentials like the + // AccumuloInputFormat expects + Token<?> accumuloToken = null; + Collection<Token<? extends TokenIdentifier>> tokens = user.getTokens(); + for (Token<?> token : tokens) { + if (ACCUMULO_SERVICE.equals(token.getKind())) { + accumuloToken = token; + break; + } + } + + // If we didn't find the Token, we can't proceed. Log the tokens for debugging. + if (null == accumuloToken) { + log.error("Could not find accumulo token in user: " + tokens); + throw new IOException("Could not find Accumulo Token in user's tokens"); + } + + // Add the Hadoop token back to the Job, the configuration still has the necessary + // Accumulo token information. + mergeTokenIntoJobConf(jobConf, accumuloToken); + } + + /** + * Merge the provided <code>Token</code> into the JobConf. + * + * @param jobConf + * JobConf to merge token into + * @param accumuloToken + * The Token + * @throws IOException + * If the merging fails + */ + public void mergeTokenIntoJobConf(JobConf jobConf, Token<?> accumuloToken) throws IOException { + JobConf accumuloJobConf = new JobConf(jobConf); + accumuloJobConf.getCredentials().addToken(accumuloToken.getService(), accumuloToken); + + // Merge them together. + ShimLoader.getHadoopShims().mergeCredentials(jobConf, accumuloJobConf); + } + + /** + * Obtain a DelegationToken from Accumulo in a backwards compatible manner. + * + * @param conn + * The Accumulo connector + * @return The DelegationToken instance + * @throws IOException + * If the token cannot be obtained + */ + public AuthenticationToken getDelegationToken(Connector conn) throws IOException { + try { + Class<?> clz = JavaUtils.loadClass(DELEGATION_TOKEN_CONFIG_CLASS_NAME); + // DelegationTokenConfig delegationTokenConfig = new DelegationTokenConfig(); + Object delegationTokenConfig = clz.newInstance(); + + SecurityOperations secOps = conn.securityOperations(); + + Method getDelegationTokenMethod = secOps.getClass().getMethod( + GET_DELEGATION_TOKEN_METHOD_NAME, clz); + + // secOps.getDelegationToken(delegationTokenConfig) + return (AuthenticationToken) getDelegationTokenMethod.invoke(secOps, delegationTokenConfig); + } catch (Exception e) { + throw new IOException("Failed to obtain DelegationToken from Accumulo", e); + } + } + + public Token<? extends TokenIdentifier> getHadoopToken(AuthenticationToken delegationToken) + throws IOException { + try { + // DelegationTokenImpl class + Class<?> delegationTokenClass = JavaUtils.loadClass(DELEGATION_TOKEN_IMPL_CLASS_NAME); + // Methods on DelegationToken + Method getIdentifierMethod = delegationTokenClass.getMethod(GET_IDENTIFIER_METHOD_NAME); + Method getPasswordMethod = delegationTokenClass.getMethod(GET_PASSWORD_METHOD_NAME); + Method getServiceNameMethod = delegationTokenClass.getMethod(GET_SERVICE_NAME_METHOD_NAME); + + // Treat the TokenIdentifier implementation as the abstract class to avoid dependency issues + // AuthenticationTokenIdentifier identifier = delegationToken.getIdentifier(); + TokenIdentifier identifier = (TokenIdentifier) getIdentifierMethod.invoke(delegationToken); + + // new Token<AuthenticationTokenIdentifier>(identifier.getBytes(), + // delegationToken.getPassword(), identifier.getKind(), delegationToken.getServiceName()); + return new Token<TokenIdentifier>(identifier.getBytes(), (byte[]) + getPasswordMethod.invoke(delegationToken), identifier.getKind(), + (Text) getServiceNameMethod.invoke(delegationToken)); + } catch (Exception e) { + throw new IOException("Failed to create Hadoop token from Accumulo DelegationToken", e); + } + } + + /** + * Construct a <code>ClientConfiguration</code> instance in a backwards-compatible way. Allows us + * to support Accumulo 1.5 + * + * @param zookeepers + * ZooKeeper hosts + * @param instanceName + * Instance name + * @param useSasl + * Is SASL enabled + * @return A ClientConfiguration instance + * @throws IOException + * If the instance fails to be created + */ + public Object getClientConfiguration(String zookeepers, String instanceName, boolean useSasl) + throws IOException { + try { + // Construct a new instance of ClientConfiguration + Class<?> clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME); + Method loadDefaultMethod = clientConfigClass.getMethod(LOAD_DEFAULT_METHOD_NAME); + Object clientConfig = loadDefaultMethod.invoke(null); + + // Set instance and zookeeper hosts + Method setPropertyMethod = clientConfigClass.getMethod(SET_PROPERTY_METHOD_NAME, + String.class, Object.class); + setPropertyMethod.invoke(clientConfig, INSTANCE_ZOOKEEPER_HOST, zookeepers); + setPropertyMethod.invoke(clientConfig, INSTANCE_NAME, instanceName); + + if (useSasl) { + // Defaults to not using SASL, set true if SASL is being used + setPropertyMethod.invoke(clientConfig, INSTANCE_RPC_SASL_ENABLED, true); + } + + return clientConfig; + } catch (Exception e) { + String msg = "Failed to instantiate and invoke methods on ClientConfiguration"; + log.error(msg, e); + throw new IOException(msg, e); + } + } + + /** + * Wrapper around <code>setZooKeeperInstance(Configuration, ClientConfiguration)</code> which only + * exists in 1.6.0 and newer. Support backwards compat. + * + * @param jobConf + * The JobConf + * @param inputOrOutputFormatClass + * The InputFormat or OutputFormat class + * @param zookeepers + * ZooKeeper hosts + * @param instanceName + * Accumulo instance name + * @param useSasl + * Is SASL enabled + * @throws IOException + * When invocation of the method fails + */ + public void setZooKeeperInstance(JobConf jobConf, Class<?> inputOrOutputFormatClass, String + zookeepers, String instanceName, boolean useSasl) throws IOException { + try { + Class<?> clientConfigClass = JavaUtils.loadClass(CLIENT_CONFIGURATION_CLASS_NAME); + + // get the ClientConfiguration + Object clientConfig = getClientConfiguration(zookeepers, instanceName, useSasl); + + // AccumuloOutputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) or + // AccumuloInputFormat.setZooKeeperInstance(JobConf, ClientConfiguration) + Method setZooKeeperMethod = inputOrOutputFormatClass.getMethod( + SET_ZOOKEEPER_INSTANCE_METHOD_NAME, JobConf.class, clientConfigClass); + setZooKeeperMethod.invoke(null, jobConf, clientConfig); + } catch (Exception e) { + throw new IOException("Failed to invoke setZooKeeperInstance method", e); + } + } + + /** + * Wrapper around <code>ConfiguratorBase.unwrapAuthenticationToken</code> which only exists in + * 1.7.0 and new. Uses reflection to not break compat. + * + * @param jobConf + * JobConf object + * @param token + * The DelegationTokenStub instance + * @return A DelegationTokenImpl created from the Token in the Job's credentials + * @throws IOException + * If the token fails to be unwrapped + */ + public AuthenticationToken unwrapAuthenticationToken(JobConf jobConf, AuthenticationToken token) + throws IOException { + try { + Class<?> configuratorBaseClass = JavaUtils.loadClass(CONFIGURATOR_BASE_CLASS_NAME); + Method unwrapAuthenticationTokenMethod = configuratorBaseClass.getMethod( + UNWRAP_AUTHENTICATION_TOKEN_METHOD_NAME, JobConf.class, AuthenticationToken.class); + // ConfiguratorBase.unwrapAuthenticationToken(conf, token); + return (AuthenticationToken) unwrapAuthenticationTokenMethod.invoke(null, jobConf, token); + } catch (Exception e) { + throw new IOException("Failed to unwrap AuthenticationToken", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java index 08d396e..083678f 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableInputFormat.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.client.mapred.AccumuloRowInputFormat; import org.apache.accumulo.core.client.mapred.RangeInputSplit; +import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; @@ -49,6 +50,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; import org.apache.hadoop.hive.accumulo.AccumuloHiveRow; +import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.columns.ColumnMapper; import org.apache.hadoop.hive.accumulo.columns.ColumnMapping; import org.apache.hadoop.hive.accumulo.columns.HiveAccumuloColumnMapping; @@ -70,6 +72,9 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +92,7 @@ public class HiveAccumuloTableInputFormat implements // Visible for testing protected AccumuloRowInputFormat accumuloInputFormat = new AccumuloRowInputFormat(); protected AccumuloPredicateHandler predicateHandler = AccumuloPredicateHandler.getInstance(); + protected HiveAccumuloHelper helper = new HiveAccumuloHelper(); @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { @@ -103,7 +109,22 @@ public class HiveAccumuloTableInputFormat implements Path[] tablePaths = FileInputFormat.getInputPaths(context); try { - final Connector connector = accumuloParams.getConnector(instance); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + final Connector connector; + + // Need to get a Connector so we look up the user's authorizations if not otherwise specified + if (accumuloParams.useSasl() && !ugi.hasKerberosCredentials()) { + // In a YARN/Tez job, don't have the Kerberos credentials anymore, use the delegation token + AuthenticationToken token = ConfiguratorBase.getAuthenticationToken( + AccumuloInputFormat.class, jobConf); + // Convert the stub from the configuration back into a normal Token + // More reflection to support 1.6 + token = helper.unwrapAuthenticationToken(jobConf, token); + connector = instance.getConnector(accumuloParams.getAccumuloUserName(), token); + } else { + // Still in the local JVM, use the username+password or Kerberos credentials + connector = accumuloParams.getConnector(instance); + } final List<ColumnMapping> columnMappings = columnMapper.getColumnMappings(); final List<IteratorSetting> iterators = predicateHandler.getIterators(jobConf, columnMapper); final Collection<Range> ranges = predicateHandler.getRanges(jobConf, columnMapper); @@ -254,18 +275,50 @@ public class HiveAccumuloTableInputFormat implements protected void configure(JobConf conf, Instance instance, Connector connector, AccumuloConnectionParameters accumuloParams, ColumnMapper columnMapper, List<IteratorSetting> iterators, Collection<Range> ranges) throws AccumuloSecurityException, - AccumuloException, SerDeException { + AccumuloException, SerDeException, IOException { // Handle implementation of Instance and invoke appropriate InputFormat method if (instance instanceof MockInstance) { setMockInstance(conf, instance.getInstanceName()); } else { - setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers()); + setZooKeeperInstance(conf, instance.getInstanceName(), instance.getZooKeepers(), + accumuloParams.useSasl()); } // Set the username/passwd for the Accumulo connection - setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), - new PasswordToken(accumuloParams.getAccumuloPassword())); + if (accumuloParams.useSasl()) { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + // If we have Kerberos credentials, we should obtain the delegation token + if (ugi.hasKerberosCredentials()) { + Connector conn = accumuloParams.getConnector(); + AuthenticationToken token = helper.getDelegationToken(conn); + + // Send the DelegationToken down to the Configuration for Accumulo to use + setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), token); + + // Convert the Accumulo token in a Hadoop token + Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token); + + log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + + // Add the Hadoop token to the JobConf + helper.mergeTokenIntoJobConf(conf, accumuloToken); + + if (!ugi.addToken(accumuloToken)) { + throw new IOException("Failed to add Accumulo Token to UGI"); + } + } + + try { + helper.addTokenFromUserToJobConf(ugi, conf); + } catch (IOException e) { + throw new IOException("Current user did not contain necessary delegation Tokens " + ugi, e); + } + } else { + setConnectorInfo(conf, accumuloParams.getAccumuloUserName(), + new PasswordToken(accumuloParams.getAccumuloPassword())); + } // Read from the given Accumulo table setInputTableName(conf, accumuloParams.getAccumuloTableName()); @@ -312,11 +365,18 @@ public class HiveAccumuloTableInputFormat implements } @SuppressWarnings("deprecation") - protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts) { + protected void setZooKeeperInstance(JobConf conf, String instanceName, String zkHosts, + boolean isSasl) throws IOException { // To support builds against 1.5, we can't use the new 1.6 setZooKeeperInstance which // takes a ClientConfiguration class that only exists in 1.6 try { - AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts); + if (isSasl) { + // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped + // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only SASL support + helper.setZooKeeperInstance(conf, AccumuloInputFormat.class, zkHosts, instanceName, isSasl); + } else { + AccumuloInputFormat.setZooKeeperInstance(conf, instanceName, zkHosts); + } } catch (IllegalStateException ise) { // AccumuloInputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java index ce6da89..0189c07 100644 --- a/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/HiveAccumuloTableOutputFormat.java @@ -18,17 +18,23 @@ package org.apache.hadoop.hive.accumulo.mr; import java.io.IOException; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.accumulo.AccumuloConnectionParameters; +import org.apache.hadoop.hive.accumulo.HiveAccumuloHelper; import org.apache.hadoop.hive.accumulo.serde.AccumuloSerDeParameters; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; import com.google.common.base.Preconditions; @@ -38,6 +44,8 @@ import com.google.common.base.Preconditions; */ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { + protected final HiveAccumuloHelper helper = new HiveAccumuloHelper(); + @Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { configureAccumuloOutputFormat(job); @@ -63,16 +71,48 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { // Set the necessary Accumulo information try { - // Username/passwd for Accumulo - setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(), - new PasswordToken(cnxnParams.getAccumuloPassword())); - if (cnxnParams.useMockInstance()) { setAccumuloMockInstance(job, cnxnParams.getAccumuloInstanceName()); } else { // Accumulo instance name with ZK quorum setAccumuloZooKeeperInstance(job, cnxnParams.getAccumuloInstanceName(), - cnxnParams.getZooKeepers()); + cnxnParams.getZooKeepers(), cnxnParams.useSasl()); + } + + // Extract the delegation Token from the UGI and add it to the job + // The AccumuloOutputFormat will look for it there. + if (cnxnParams.useSasl()) { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + if (!ugi.hasKerberosCredentials()) { + helper.addTokenFromUserToJobConf(ugi, job); + } else { + // Still in the local JVM, can use Kerberos credentials + try { + Connector connector = cnxnParams.getConnector(); + AuthenticationToken token = helper.getDelegationToken(connector); + + // Send the DelegationToken down to the Configuration for Accumulo to use + setConnectorInfo(job, cnxnParams.getAccumuloUserName(), token); + + // Convert the Accumulo token in a Hadoop token + Token<? extends TokenIdentifier> accumuloToken = helper.getHadoopToken(token); + + log.info("Adding Hadoop Token for Accumulo to Job's Credentials"); + + // Add the Hadoop token to the JobConf + helper.mergeTokenIntoJobConf(job, accumuloToken); + + // Make sure the UGI contains the token too for good measure + if (!ugi.addToken(accumuloToken)) { + throw new IOException("Failed to add Accumulo Token to UGI"); + } + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Failed to acquire Accumulo DelegationToken", e); + } + } + } else { + setAccumuloConnectorInfo(job, cnxnParams.getAccumuloUserName(), + new PasswordToken(cnxnParams.getAccumuloPassword())); } // Set the table where we're writing this data @@ -96,9 +136,18 @@ public class HiveAccumuloTableOutputFormat extends AccumuloOutputFormat { } @SuppressWarnings("deprecation") - protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers) { + protected void setAccumuloZooKeeperInstance(JobConf conf, String instanceName, String zookeepers, + boolean isSasl) throws IOException { try { - AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + if (isSasl) { + // Reflection to support Accumulo 1.5. Remove when Accumulo 1.5 support is dropped + // 1.6 works with the deprecated 1.5 method, but must use reflection for 1.7-only + // SASL support + helper.setZooKeeperInstance(conf, AccumuloOutputFormat.class, zookeepers, instanceName, + isSasl); + } else { + AccumuloOutputFormat.setZooKeeperInstance(conf, instanceName, zookeepers); + } } catch (IllegalStateException ise) { // AccumuloOutputFormat complains if you re-set an already set value. We just don't care. log.debug("Ignoring exception setting ZooKeeper instance of " + instanceName + " at " http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java index 8b4c9ff..23be5f1 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestAccumuloConnectionParameters.java @@ -16,6 +16,8 @@ */ package org.apache.hadoop.hive.accumulo; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Instance; @@ -97,4 +99,21 @@ public class TestAccumuloConnectionParameters { // with null password cnxnParams.getConnector(instance); } + + public void testSasl() { + Configuration conf = new Configuration(false); + + // Default is false + AccumuloConnectionParameters cnxnParams = new AccumuloConnectionParameters(conf); + assertFalse(cnxnParams.useSasl()); + + conf.set(AccumuloConnectionParameters.SASL_ENABLED, "true"); + + cnxnParams = new AccumuloConnectionParameters(conf); + + assertTrue(cnxnParams.useSasl()); + + conf.set(AccumuloConnectionParameters.SASL_ENABLED, "false"); + assertFalse(cnxnParams.useSasl()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java new file mode 100644 index 0000000..88544f0 --- /dev/null +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/TestHiveAccumuloHelper.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestHiveAccumuloHelper { + + private HiveAccumuloHelper helper; + + @Before + public void setup() { + helper = new HiveAccumuloHelper(); + } + + @Test + public void testTokenMerge() throws Exception { + final Text service = new Text("service"); + Token<?> token = Mockito.mock(Token.class); + JobConf jobConf = new JobConf(); + + Mockito.when(token.getService()).thenReturn(service); + + helper.mergeTokenIntoJobConf(jobConf, token); + + Collection<Token<?>> tokens = jobConf.getCredentials().getAllTokens(); + assertEquals(1, tokens.size()); + assertEquals(service, tokens.iterator().next().getService()); + } + + @Test + public void testTokenToConfFromUser() throws Exception { + UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class); + JobConf jobConf = new JobConf(); + ArrayList<Token<?>> tokens = new ArrayList<>(); + Text service = new Text("service"); + Token<?> token = Mockito.mock(Token.class); + tokens.add(token); + + Mockito.when(ugi.getTokens()).thenReturn(tokens); + Mockito.when(token.getKind()).thenReturn(HiveAccumuloHelper.ACCUMULO_SERVICE); + Mockito.when(token.getService()).thenReturn(service); + + helper.addTokenFromUserToJobConf(ugi, jobConf); + + Collection<Token<?>> credTokens = jobConf.getCredentials().getAllTokens(); + assertEquals(1, credTokens.size()); + assertEquals(service, credTokens.iterator().next().getService()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java index e8beeb6..ee5aecf 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableInputFormat.java @@ -526,7 +526,7 @@ public class TestHiveAccumuloTableInputFormat { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, @@ -568,7 +568,7 @@ public class TestHiveAccumuloTableInputFormat { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, new Authorizations("foo,bar")); @@ -622,7 +622,7 @@ public class TestHiveAccumuloTableInputFormat { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, @@ -678,7 +678,7 @@ public class TestHiveAccumuloTableInputFormat { ranges); // Verify that the correct methods are invoked on AccumuloInputFormat - Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(mockInputFormat).setZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(mockInputFormat).setConnectorInfo(conf, USER, new PasswordToken(PASS)); Mockito.verify(mockInputFormat).setInputTableName(conf, TEST_TABLE); Mockito.verify(mockInputFormat).setScanAuthorizations(conf, http://git-wip-us.apache.org/repos/asf/hive/blob/bb157fb8/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java index 093245d..5d3f15b 100644 --- a/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java +++ b/accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/mr/TestHiveAccumuloTableOutputFormat.java @@ -98,7 +98,7 @@ public class TestHiveAccumuloTableOutputFormat { outputFormat.configureAccumuloOutputFormat(conf); Mockito.verify(outputFormat).setAccumuloConnectorInfo(conf, user, new PasswordToken(password)); - Mockito.verify(outputFormat).setAccumuloZooKeeperInstance(conf, instanceName, zookeepers); + Mockito.verify(outputFormat).setAccumuloZooKeeperInstance(conf, instanceName, zookeepers, false); Mockito.verify(outputFormat).setDefaultAccumuloTableName(conf, outputTable); }
