Updated Branches: refs/heads/trunk 66af31d13 -> ddb81e185
SQOOP-1032: Add the --bulk-load-dir option to support the HBase doBulkLoad function (Alexandre Normand via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ddb81e18 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ddb81e18 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ddb81e18 Branch: refs/heads/trunk Commit: ddb81e185be72c7530498c379e6ad45e6d54a2d6 Parents: 66af31d Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Oct 10 16:57:37 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Oct 10 16:57:37 2013 -0700 ---------------------------------------------------------------------- src/docs/user/hbase-args.txt | 1 + src/docs/user/hbase.txt | 3 +- src/java/org/apache/sqoop/SqoopOptions.java | 17 +++ .../apache/sqoop/hbase/HBasePutProcessor.java | 6 + .../sqoop/hbase/ToStringPutTransformer.java | 9 +- .../org/apache/sqoop/manager/SqlManager.java | 14 +- .../sqoop/mapreduce/HBaseBulkImportJob.java | 146 +++++++++++++++++++ .../sqoop/mapreduce/HBaseBulkImportMapper.java | 98 +++++++++++++ .../apache/sqoop/mapreduce/ImportJobBase.java | 9 ++ .../org/apache/sqoop/tool/BaseSqoopTool.java | 17 +++ .../com/cloudera/sqoop/TestSqoopOptions.java | 29 ++++ 11 files changed, 344 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/docs/user/hbase-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt index 8ba23eb..53040f5 100644 --- a/src/docs/user/hbase-args.txt +++ b/src/docs/user/hbase-args.txt @@ -33,5 +33,6 @@ Argument Description attributes +\--hbase-table <table-name>+ Specifies an HBase table to use as the \ target instead of HDFS ++\--hbase-bulkload+ Enables bulk loading -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/docs/user/hbase.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt index 34f9875..ab4aedc 100644 --- a/src/docs/user/hbase.txt +++ b/src/docs/user/hbase.txt @@ -58,4 +58,5 @@ mode), and then inserts the UTF-8 bytes of this string in the target cell. Sqoop will skip all rows containing null values in all columns except the row key column. - +To decrease the load on hbase, Sqoop can do bulk loading as opposed to +direct writes. To use bulk loading, enable it using +\--hbase-bulkload+. http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 01805f9..836f588 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -230,6 +230,9 @@ public class SqoopOptions implements Cloneable { // Column of the input to use as the row key. @StoredAsProperty("hbase.row.key.col") private String hbaseRowKeyCol; + // if true, bulk loading will be used. + @StoredAsProperty("hbase.bulk.load.enabled") private boolean hbaseBulkLoadEnabled; + // if true, create tables/col families. @StoredAsProperty("hbase.create.table") private boolean hbaseCreateTable; @@ -1924,6 +1927,20 @@ public class SqoopOptions implements Cloneable { } /** + * @return true if bulk load is enabled and false otherwise. + */ + public boolean isBulkLoadEnabled() { + return this.hbaseBulkLoadEnabled; + } + + /** + * Sets the temp dir to use as the bulk load dir in an hbase import. + */ + public void setHBaseBulkLoadEnabled(boolean hbaseBulkLoadEnabled) { + this.hbaseBulkLoadEnabled = hbaseBulkLoadEnabled; + } + + /** * Gets the target HBase table name, if any. */ public String getHBaseTable() { http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java index 9ceb5bd..b2431ac 100644 --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java @@ -66,6 +66,12 @@ public class HBasePutProcessor implements Closeable, Configurable, public static final String TRANSFORMER_CLASS_KEY = "sqoop.hbase.insert.put.transformer.class"; + /** + * Configuration key to enable/disable hbase bulkLoad. + */ + public static final String BULK_LOAD_ENABLED_KEY = + "sqoop.hbase.bulk.load.enabled"; + /** Configuration key to specify whether to add the row key column into * HBase. Set to false by default. */ http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java index 5ccf311..b5cad1d 100644 --- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java @@ -181,8 +181,13 @@ public class ToStringPutTransformer extends PutTransformer { // check addRowKey flag before including rowKey field. Object val = fieldEntry.getValue(); if (null != val) { - put.add(colFamilyBytes, getFieldNameBytes(colName), - Bytes.toBytes(toHBaseString(val))); + if ( val instanceof byte[]) { + put.add(colFamilyBytes, getFieldNameBytes(colName), + (byte[])val); + } else { + put.add(colFamilyBytes, getFieldNameBytes(colName), + Bytes.toBytes(toHBaseString(val))); + } } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/manager/SqlManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index 2a4992d..1ffa40f 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -41,7 +41,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sqoop.mapreduce.JdbcCallExportJob; +import org.apache.sqoop.tool.BaseSqoopTool; import org.apache.sqoop.util.LoggingUtils; +import org.apache.sqoop.mapreduce.HBaseBulkImportJob; import org.apache.sqoop.util.SqlTypeMap; import com.cloudera.sqoop.SqoopOptions; @@ -587,7 +589,11 @@ public abstract class SqlManager throw new ImportException("HBase jars are not present in " + "classpath, cannot import to HBase!"); } - importer = new HBaseImportJob(opts, context); + if(!opts.isBulkLoadEnabled()){ + importer = new HBaseImportJob(opts, context); + } else { + importer = new HBaseBulkImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), @@ -619,7 +625,11 @@ public abstract class SqlManager throw new ImportException("HBase jars are not present in classpath," + " cannot import to HBase!"); } - importer = new HBaseImportJob(opts, context); + if(!opts.isBulkLoadEnabled()){ + importer = new HBaseImportJob(opts, context); + } else { + importer = new HBaseBulkImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java new file mode 100644 index 0000000..b32cdd1 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportJob.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.util.ImportException; +import com.cloudera.sqoop.SqoopOptions; +import com.google.common.base.Preconditions; + +/** + * Runs an HBase bulk import via DataDrivenDBInputFormat to the + * HBasePutProcessor in the DelegatingOutputFormat. + */ +public class HBaseBulkImportJob extends HBaseImportJob { + + public static final Log LOG = LogFactory.getLog( + HBaseBulkImportJob.class.getName()); + + public HBaseBulkImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(Put.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class<? extends Mapper> getMapperClass() { + return HBaseBulkImportMapper.class; + } + + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + super.jobSetup(job); + + // we shouldn't have gotten here if bulk load dir is not set + // so let's throw a ImportException + if(getContext().getDestination() == null){ + throw new ImportException("Can't run HBaseBulkImportJob without a " + + "valid destination directory."); + } + + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Preconditions.class); + FileOutputFormat.setOutputPath(job, getContext().getDestination()); + HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); + HFileOutputFormat.configureIncrementalLoad(job, hTable); + } + + /** + * Perform the loading of Hfiles. + */ + @Override + protected void completeImport(Job job) throws IOException, ImportException { + super.completeImport(job); + + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + + // Make the bulk load files source directory accessible to the world + // so that the hbase user can deal with it + Path bulkLoadDir = getContext().getDestination(); + setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), + FsPermission.createImmutable((short) 00777)); + + HTable hTable = new HTable(job.getConfiguration(), options.getHBaseTable()); + + // Load generated HFiles into table + try { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles( + job.getConfiguration()); + loader.doBulkLoad(bulkLoadDir, hTable); + } + catch (Exception e) { + String errorMessage = String.format("Unrecoverable error while " + + "performing the bulk load of files in [%s]", + bulkLoadDir.toString()); + throw new ImportException(errorMessage, e); + } + } + + @Override + protected void jobTeardown(Job job) throws IOException, ImportException { + super.jobTeardown(job); + // Delete the hfiles directory after we are finished. + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + fileSystem.delete(getContext().getDestination(), true); + } + + /** + * Set the file permission of the path of the given fileStatus. If the path + * is a directory, apply permission recursively to all subdirectories and + * files. + * + * @param fs the filesystem + * @param fileStatus containing the path + * @param permission the permission + * @throws java.io.IOException + */ + private void setPermission(FileSystem fs, FileStatus fileStatus, + FsPermission permission) throws IOException { + if(fileStatus.isDir()) { + for(FileStatus file : fs.listStatus(fileStatus.getPath())){ + setPermission(fs, file, permission); + } + } + fs.setPermission(fileStatus.getPath(), permission); + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java new file mode 100644 index 0000000..9c9d6cd --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sqoop.hbase.PutTransformer; +import org.apache.sqoop.hbase.ToStringPutTransformer; + +import com.cloudera.sqoop.lib.LargeObjectLoader; +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.AutoProgressMapper; +import static org.apache.sqoop.hbase.HBasePutProcessor.*; + +/** + * Imports records by writing them to HBase via the DelegatingOutputFormat + * and the HBasePutProcessor. + */ +public class HBaseBulkImportMapper + extends AutoProgressMapper + <LongWritable, SqoopRecord, ImmutableBytesWritable, Put> { + + private LargeObjectLoader lobLoader; + //An object that can transform a map of fieldName->object + // into a Put command. + private PutTransformer putTransformer; + private Configuration conf; + @Override + protected void setup(Context context) + throws IOException, InterruptedException { + this.conf = context.getConfiguration(); + Path largeFilePath = new Path(this.conf.get("sqoop.hbase.lob.extern.dir", + "/tmp/sqoop-hbase-" + context.getTaskAttemptID())); + this.lobLoader = new LargeObjectLoader(context.getConfiguration(), + largeFilePath); + + // Get the implementation of PutTransformer to use. + // By default, we call toString() on every non-null field. + Class<? extends PutTransformer> xformerClass = + (Class<? extends PutTransformer>) + this.conf.getClass(TRANSFORMER_CLASS_KEY, ToStringPutTransformer.class); + this.putTransformer = (PutTransformer) + ReflectionUtils.newInstance(xformerClass, this.conf); + if (null == putTransformer) { + throw new RuntimeException("Could not instantiate PutTransformer."); + } + this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null)); + this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); + } + @Override + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + try { + // Loading of LOBs was delayed until we have a Context. + val.loadLargeObjects(lobLoader); + } catch (SQLException sqlE) { + throw new IOException(sqlE); + } + Map<String, Object> fields = val.getFieldMap(); + + List<Put> putList = putTransformer.getPutCommand(fields); + for(Put put: putList){ + context.write(new ImmutableBytesWritable(put.getRow()), put); + } + } + @Override + protected void cleanup(Context context) throws IOException { + if (null != lobLoader) { + lobLoader.close(); + } + } +} + http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index 36959e1..8b1493d 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -249,6 +249,8 @@ public class ImportJobBase extends JobBase { throw new ImportException("Import job failed!"); } + completeImport(job); + if (options.isValidationEnabled()) { validateImport(tableName, conf, job); } @@ -262,6 +264,13 @@ public class ImportJobBase extends JobBase { } } + /** + * Perform any operation that needs to be done post map/reduce job to + * complete the import. + */ + protected void completeImport(Job job) throws IOException, ImportException { + } + protected void validateImport(String tableName, Configuration conf, Job job) throws ImportException { LOG.debug("Validating imported data."); http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index ebb1857..a1080d3 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.Properties; +import com.cloudera.sqoop.util.ImportException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; @@ -175,6 +176,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { public static final String HBASE_TABLE_ARG = "hbase-table"; public static final String HBASE_COL_FAM_ARG = "column-family"; public static final String HBASE_ROW_KEY_ARG = "hbase-row-key"; + public static final String HBASE_BULK_LOAD_ENABLED_ARG = + "hbase-bulkload"; public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table"; @@ -710,6 +713,10 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { .withLongOpt(HBASE_ROW_KEY_ARG) .create()); hbaseOpts.addOption(OptionBuilder + .withDescription("Enables HBase bulk loading") + .withLongOpt(HBASE_BULK_LOAD_ENABLED_ARG) + .create()); + hbaseOpts.addOption(OptionBuilder .withDescription("If specified, create missing HBase tables") .withLongOpt(HBASE_CREATE_TABLE_ARG) .create()); @@ -1076,6 +1083,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { out.setHBaseRowKeyColumn(in.getOptionValue(HBASE_ROW_KEY_ARG)); } + out.setHBaseBulkLoadEnabled(in.hasOption(HBASE_BULK_LOAD_ENABLED_ARG)); + if (in.hasOption(HBASE_CREATE_TABLE_ARG)) { out.setCreateHBaseTable(true); } @@ -1326,6 +1335,14 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { throw new InvalidOptionsException("Direct import is incompatible with " + "HBase. Please remove parameter --direct"); } + + if (options.isBulkLoadEnabled() && options.getHBaseTable() == null) { + String validationMessage = String.format("Can't run import with %s " + + "without %s", + BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG, + BaseSqoopTool.HBASE_TABLE_ARG); + throw new InvalidOptionsException(validationMessage); + } } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/ddb81e18/src/test/com/cloudera/sqoop/TestSqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java index 03e2504..90bc08e 100644 --- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java +++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java @@ -20,6 +20,7 @@ package com.cloudera.sqoop; import java.util.Properties; +import com.cloudera.sqoop.tool.BaseSqoopTool; import junit.framework.TestCase; import org.apache.commons.lang.ArrayUtils; @@ -433,4 +434,32 @@ public class TestSqoopOptions extends TestCase { } } + // test that hbase bulk load import with table name and target dir + // passes validation + public void testHBaseBulkLoad() throws Exception { + String [] extraArgs = { + longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG), + longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test", + longArgument(BaseSqoopTool.HBASE_TABLE_ARG), "test_table", + longArgument(BaseSqoopTool.HBASE_COL_FAM_ARG), "d"}; + + validateImportOptions(extraArgs); + } + + // test that hbase bulk load import with a missing --hbase-table fails + public void testHBaseBulkLoadMissingHbaseTable() throws Exception { + String [] extraArgs = { + longArgument(BaseSqoopTool.HBASE_BULK_LOAD_ENABLED_ARG), + longArgument(BaseSqoopTool.TARGET_DIR_ARG), "./test"}; + try { + validateImportOptions(extraArgs); + fail("Expected InvalidOptionsException"); + } catch (SqoopOptions.InvalidOptionsException ioe) { + // Expected + } + } + + private static String longArgument(String argument) { + return String.format("--%s", argument); + } }
