Repository: sqoop Updated Branches: refs/heads/trunk c6627c04c -> d2c062b20
SQOOP-2406: Add support for secure mode when importing Parquet files into Hive (Abraham Elmahrek via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/d2c062b2 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/d2c062b2 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/d2c062b2 Branch: refs/heads/trunk Commit: d2c062b202a5d44bf5b2c35f98734f9a01cc9b74 Parents: c6627c0 Author: Jarek Jarcec Cecho <[email protected]> Authored: Mon Jul 20 09:22:12 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Mon Jul 20 09:22:12 2015 -0700 ---------------------------------------------------------------------- src/docs/user/hive-notes.txt | 7 ++ .../sqoop/mapreduce/DataDrivenImportJob.java | 3 +- .../org/apache/sqoop/mapreduce/ParquetJob.java | 109 ++++++++++++++++++- 3 files changed, 117 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/docs/user/hive-notes.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/hive-notes.txt b/src/docs/user/hive-notes.txt index a665a20..deee270 100644 --- a/src/docs/user/hive-notes.txt +++ b/src/docs/user/hive-notes.txt @@ -29,3 +29,10 @@ direct mapping (for example, +DATE+, +TIME+, and +TIMESTAMP+) will be coerced to +DOUBLE+. In these cases, Sqoop will emit a warning in its log messages informing you of the loss of precision. +Parquet Support in Hive +~~~~~~~~~~~~~~~~~~~~~~~ + +In order to contact the Hive MetaStore from a MapReduce job, a delegation token will +be fetched and passed. HIVE_CONF_DIR and HIVE_HOME must be set appropriately to add +Hive to the runtime classpath. Otherwise, importing/exporting into Hive in Parquet +format may not work. http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java index 388ce7d..260bc29 100644 --- a/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; @@ -98,7 +99,7 @@ public class DataDrivenImportJob extends ImportJobBase { AvroJob.setMapOutputSchema(job.getConfiguration(), schema); } else if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) { - Configuration conf = job.getConfiguration(); + JobConf conf = (JobConf)job.getConfiguration(); // Kite SDK requires an Avro schema to represent the data structure of // target dataset. If the schema name equals to generated java class name, // the import will fail. So we use table name as schema name and add a http://git-wip-us.apache.org/repos/asf/sqoop/blob/d2c062b2/src/java/org/apache/sqoop/mapreduce/ParquetJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java index c775ef3..f310419 100644 --- a/src/java/org/apache/sqoop/mapreduce/ParquetJob.java +++ b/src/java/org/apache/sqoop/mapreduce/ParquetJob.java @@ -23,6 +23,11 @@ import org.apache.avro.generic.GenericRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.kitesdk.data.CompressionType; import org.kitesdk.data.Dataset; import org.kitesdk.data.DatasetDescriptor; @@ -32,6 +37,8 @@ import org.kitesdk.data.mapreduce.DatasetKeyOutputFormat; import org.kitesdk.data.spi.SchemaValidationUtil; import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Map; /** * Helper class for setting up a Parquet MapReduce job. @@ -40,6 +47,14 @@ public final class ParquetJob { public static final Log LOG = LogFactory.getLog(ParquetJob.class.getName()); + public static final String HIVE_CONF_CLASS = "org.apache.hadoop.hive.conf.HiveConf"; + public static final String HIVE_METASTORE_CLIENT_CLASS = "org.apache.hadoop.hive.metastore.HiveMetaStoreClient"; + public static final String HIVE_METASTORE_SASL_ENABLED = "hive.metastore.sasl.enabled"; + // Purposefully choosing the same token alias as the one Oozie chooses. + // Make sure we don't generate a new delegation token if oozie + // has already generated one. + public static final String HIVE_METASTORE_TOKEN_ALIAS = "HCat Token"; + private ParquetJob() { } @@ -72,9 +87,21 @@ public final class ParquetJob { * {@link org.apache.sqoop.lib.SqoopRecord}. The output key is * {@link org.apache.avro.generic.GenericRecord}. */ - public static void configureImportJob(Configuration conf, Schema schema, + public static void configureImportJob(JobConf conf, Schema schema, String uri, WriteMode writeMode) throws IOException { Dataset dataset; + Configuration hiveConf = getHiveConf(conf); + + // Add hive delegation token only if we don't already have one. + if (uri.startsWith("dataset:hive") && isSecureMetastore(hiveConf)) { + // Copy hive configs to job config + addHiveConfigs(hiveConf, conf); + + if (conf.getCredentials().getToken(new Text(HIVE_METASTORE_TOKEN_ALIAS)) == null) { + addHiveDelegationToken(conf); + } + } + if (Datasets.exists(uri)) { if (WriteMode.DEFAULT.equals(writeMode)) { throw new IOException("Destination exists! " + uri); @@ -113,4 +140,84 @@ public final class ParquetJob { return Datasets.create(uri, descriptor, GenericRecord.class); } + private static boolean isSecureMetastore(Configuration conf) { + return conf != null && conf.getBoolean(HIVE_METASTORE_SASL_ENABLED, false); + } + + /** + * Dynamically create hive configuration object. + * @param conf + * @return + */ + private static Configuration getHiveConf(Configuration conf) { + try { + Class HiveConfClass = Class.forName(HIVE_CONF_CLASS); + return ((Configuration)(HiveConfClass.getConstructor(Configuration.class, Class.class) + .newInstance(conf, Configuration.class))); + } catch (ClassNotFoundException ex) { + LOG.error("Could not load " + HIVE_CONF_CLASS + + ". Make sure HIVE_CONF_DIR is set correctly."); + } catch (Exception ex) { + LOG.error("Could not instantiate HiveConf instance.", ex); + } + return null; + } + + /** + * Add hive delegation token to credentials store. + * @param conf + */ + private static void addHiveDelegationToken(JobConf conf) { + // Need to use reflection since there's no compile time dependency on the client libs. + Class<?> HiveConfClass; + Class<?> HiveMetaStoreClientClass; + + try { + HiveMetaStoreClientClass = Class.forName(HIVE_METASTORE_CLIENT_CLASS); + } catch (ClassNotFoundException ex) { + LOG.error("Could not load " + HIVE_METASTORE_CLIENT_CLASS + + " when adding hive delegation token. " + + "Make sure HIVE_CONF_DIR is set correctly.", ex); + throw new RuntimeException("Couldn't fetch delegation token.", ex); + } + + try { + HiveConfClass = Class.forName(HIVE_CONF_CLASS); + } catch (ClassNotFoundException ex) { + LOG.error("Could not load " + HIVE_CONF_CLASS + + " when adding hive delegation token." + + " Make sure HIVE_CONF_DIR is set correctly.", ex); + throw new RuntimeException("Couldn't fetch delegation token.", ex); + } + + try { + Object client = HiveMetaStoreClientClass.getConstructor(HiveConfClass).newInstance( + HiveConfClass.getConstructor(Configuration.class, Class.class).newInstance(conf, Configuration.class) + ); + // getDelegationToken(String kerberosPrincial) + Method getDelegationTokenMethod = HiveMetaStoreClientClass.getMethod("getDelegationToken", String.class); + Object tokenStringForm = getDelegationTokenMethod.invoke(client, UserGroupInformation.getLoginUser().getShortUserName()); + + // Load token + Token<DelegationTokenIdentifier> metastoreToken = new Token<DelegationTokenIdentifier>(); + metastoreToken.decodeFromUrlString(tokenStringForm.toString()); + conf.getCredentials().addToken(new Text(HIVE_METASTORE_TOKEN_ALIAS), metastoreToken); + + LOG.debug("Successfully fetched hive metastore delegation token. " + metastoreToken); + } catch (Exception ex) { + LOG.error("Couldn't fetch delegation token.", ex); + throw new RuntimeException("Couldn't fetch delegation token.", ex); + } + } + + /** + * Add hive conf to configuration object without overriding already set properties. + * @param hiveConf + * @param conf + */ + private static void addHiveConfigs(Configuration hiveConf, Configuration conf) { + for (Map.Entry<String, String> item : hiveConf) { + conf.setIfUnset(item.getKey(), item.getValue()); + } + } }
