Repository: sqoop Updated Branches: refs/heads/trunk 3153c3610 -> 69463f0b3
SQOOP-3267: Incremental import to HBase deletes only last version of column (Daniel Voros by Szabolcs Vasas) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/69463f0b Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/69463f0b Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/69463f0b Branch: refs/heads/trunk Commit: 69463f0b3ed3af28581202ef59079b9df7bc0bad Parents: 3153c36 Author: Szabolcs Vasas <va...@apache.org> Authored: Thu Feb 22 15:14:04 2018 +0100 Committer: Szabolcs Vasas <va...@apache.org> Committed: Thu Feb 22 15:14:04 2018 +0100 ---------------------------------------------------------------------- src/docs/man/hbase-args.txt | 5 ++ src/docs/user/hbase-args.txt | 33 +++++----- src/docs/user/hbase.txt | 5 ++ src/java/org/apache/sqoop/SqoopOptions.java | 30 +++++++++ .../apache/sqoop/hbase/HBasePutProcessor.java | 2 + .../sqoop/hbase/ToStringPutTransformer.java | 18 +++++- .../apache/sqoop/mapreduce/HBaseImportJob.java | 3 +- .../org/apache/sqoop/tool/BaseSqoopTool.java | 21 ++++++- src/test/org/apache/sqoop/TestSqoopOptions.java | 1 + .../org/apache/sqoop/hbase/HBaseImportTest.java | 65 ++++++++++++++------ .../org/apache/sqoop/hbase/HBaseTestCase.java | 10 ++- 11 files changed, 153 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/docs/man/hbase-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/man/hbase-args.txt b/src/docs/man/hbase-args.txt index afd5c5b..af9c96a 100644 --- a/src/docs/man/hbase-args.txt +++ b/src/docs/man/hbase-args.txt @@ -36,4 +36,9 @@ HBase options --hbase-table (table-name):: Specifies an HBase table to use as the target instead of HDFS +--hbase-null-incremental-mode (mode):: + How to handle columns updated to null during incremental imports. +ignore+ is the default and + will result in retaining the previously imported value. +delete+ mode will delete all previous + versions of the column from HBase. + http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/docs/user/hbase-args.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt index 53040f5..4076214 100644 --- a/src/docs/user/hbase-args.txt +++ b/src/docs/user/hbase-args.txt @@ -20,19 +20,22 @@ .HBase arguments: [grid="all"] -`-----------------------------`------------------------------------------- -Argument Description --------------------------------------------------------------------------- -+\--column-family <family>+ Sets the target column family for the import -+\--hbase-create-table+ If specified, create missing HBase tables -+\--hbase-row-key <col>+ Specifies which input column to use as the\ - row key - In case, if input table contains composite - key, then <col> must be in the form of a - comma-separated list of composite key - attributes -+\--hbase-table <table-name>+ Specifies an HBase table to use as the \ - target instead of HDFS -+\--hbase-bulkload+ Enables bulk loading --------------------------------------------------------------------------- +`---------------------------------------`------------------------------------------- +Argument Description +------------------------------------------------------------------------------------ ++\--column-family <family>+ Sets the target column family for the import ++\--hbase-create-table+ If specified, create missing HBase tables ++\--hbase-row-key <col>+ Specifies which input column to use as the\ + row key + In case, if input table contains composite + key, then <col> must be in the form of a + comma-separated list of composite key + attributes ++\--hbase-table <table-name>+ Specifies an HBase table to use as the \ + target instead of HDFS ++\--hbase-bulkload+ Enables bulk loading ++\--hbase-null-incremental-mode <mode>+ How to handle columns updated to null. \ + Legal values for <mode> are +ignore+ \ + (default) and +delete+. +------------------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/docs/user/hbase.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt index ab4aedc..817956d 100644 --- a/src/docs/user/hbase.txt +++ b/src/docs/user/hbase.txt @@ -58,5 +58,10 @@ mode), and then inserts the UTF-8 bytes of this string in the target cell. Sqoop will skip all rows containing null values in all columns except the row key column. +By default Sqoop will retain the previously imported value for columns +updated to null during incremental imports. This can be changed to +delete all previous versions of the column by using ++\--hbase-null-incremental-mode delete+. + To decrease the load on hbase, Sqoop can do bulk loading as opposed to direct writes. To use bulk loading, enable it using +\--hbase-bulkload+. http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/java/org/apache/sqoop/SqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 73d0757..651cebd 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -105,6 +105,18 @@ public class SqoopOptions implements Cloneable { } /** + * How to handle null values when doing incremental import into HBase table: + * <ul> + * <li>Ignore: ignore update, retain previous value</li> + * <li>Delete: delete all previous values of column</li> + * </ul> + */ + public enum HBaseNullIncrementalMode { + Ignore, + Delete, + } + + /** * Update mode option specifies how updates are performed when * new rows are found with non-matching keys in database. * It supports two modes: @@ -322,6 +334,9 @@ public class SqoopOptions implements Cloneable { @StoredAsProperty("incremental.last.value") private String incrementalLastValue; + @StoredAsProperty("hbase.null.incremental.mode") + private HBaseNullIncrementalMode hbaseNullIncrementalMode; + // exclude these tables when importing all tables. @StoredAsProperty("import.all_tables.exclude") private String allTablesExclude; @@ -1085,6 +1100,7 @@ public class SqoopOptions implements Cloneable { this.dbOutColumns = null; this.incrementalMode = IncrementalMode.None; + this.hbaseNullIncrementalMode = HBaseNullIncrementalMode.Ignore; this.updateMode = UpdateMode.UpdateOnly; @@ -2301,6 +2317,20 @@ public class SqoopOptions implements Cloneable { } /** + * Get HBase null incremental mode to use. + */ + public HBaseNullIncrementalMode getHbaseNullIncrementalMode() { + return hbaseNullIncrementalMode; + } + + /** + * Set HBase null incremental mode to use. + */ + public void setHbaseNullIncrementalMode(HBaseNullIncrementalMode hbaseNullIncrementalMode) { + this.hbaseNullIncrementalMode = hbaseNullIncrementalMode; + } + + /** * Set the tables to be excluded when doing all table import. */ public void setAllTablesExclude(String exclude) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java index 27d6006..df9836b 100644 --- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java +++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java @@ -63,6 +63,8 @@ public class HBasePutProcessor implements Closeable, Configurable, public static final String ROW_KEY_COLUMN_KEY = "sqoop.hbase.insert.row.key.column"; + public static final String NULL_INCREMENTAL_MODE = "hbase.null.incremental.mode"; + /** * Configuration key specifying the PutTransformer implementation to use. */ http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java index 0bd6169..8600382 100644 --- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java +++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; +import org.apache.sqoop.SqoopOptions; import org.apache.sqoop.mapreduce.ImportJobBase; import java.io.IOException; @@ -57,6 +58,7 @@ public class ToStringPutTransformer extends PutTransformer { protected boolean addRowKey; private boolean isCompositeKey = false; private List<String> compositeKeyAttributes; + private SqoopOptions.HBaseNullIncrementalMode nullMode; /** * Used as delimiter to combine composite-key column names when passed as. @@ -170,6 +172,7 @@ public class ToStringPutTransformer extends PutTransformer { String colFamily, String rowKey) { byte[] colFamilyBytes = Bytes.toBytes(colFamily); List<Mutation> mutationList = new ArrayList<Mutation>(); + Put put = null; for (Map.Entry<String, Object> fieldEntry : record.entrySet()) { String colName = fieldEntry.getKey(); boolean rowKeyCol = false; @@ -187,7 +190,10 @@ public class ToStringPutTransformer extends PutTransformer { Object val = fieldEntry.getValue(); if (null != val) { // Put row-key in HBase - Put put = new Put(Bytes.toBytes(rowKey)); + if (put == null) { + put = new Put(Bytes.toBytes(rowKey)); + mutationList.add(put); + } if ( val instanceof byte[]) { put.addColumn(colFamilyBytes, getFieldNameBytes(colName), (byte[])val); @@ -197,9 +203,16 @@ public class ToStringPutTransformer extends PutTransformer { } mutationList.add(put); } else { + switch (nullMode) { + case Delete: Delete delete = new Delete(Bytes.toBytes(rowKey)); - delete.addColumn(colFamilyBytes, getFieldNameBytes(colName)); + delete.addColumns(colFamilyBytes, getFieldNameBytes(colName)); mutationList.add(delete); + break; + case Ignore: + // Do nothing + break; + } } } } @@ -218,6 +231,7 @@ public class ToStringPutTransformer extends PutTransformer { @Override public void init(Configuration conf) { + nullMode = conf.getEnum(HBasePutProcessor.NULL_INCREMENTAL_MODE, SqoopOptions.HBaseNullIncrementalMode.Ignore); setColumnFamily(conf.get(COL_FAMILY_KEY, null)); setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null)); http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java index 33da487..a09a45e 100644 --- a/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java @@ -93,9 +93,10 @@ public class HBaseImportJob extends DataDrivenImportJob { HBasePutProcessor.class, FieldMapProcessor.class); - // Set the HBase parameters (table, column family, row key): + // Set the HBase parameters (table, column family, row key, null mode): conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable()); conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily()); + conf.set(HBasePutProcessor.NULL_INCREMENTAL_MODE, options.getHbaseNullIncrementalMode().toString()); // What column of the input becomes the row key? String rowKeyCol = options.getHBaseRowKeyColumn(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index ce21918..b02e4fe 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -199,6 +199,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { public static final String HBASE_BULK_LOAD_ENABLED_ARG = "hbase-bulkload"; public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table"; + public static final String HBASE_NULL_INCREMENTAL_MODE_ARG = "hbase-null-incremental-mode"; //Accumulo arguments. public static final String ACCUMULO_TABLE_ARG = "accumulo-table"; @@ -853,6 +854,11 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { .withDescription("If specified, create missing HBase tables") .withLongOpt(HBASE_CREATE_TABLE_ARG) .create()); + hbaseOpts.addOption(OptionBuilder.withArgName("nullmode") + .hasArg() + .withDescription("How to handle null values during incremental import into HBase.") + .withLongOpt(HBASE_NULL_INCREMENTAL_MODE_ARG) + .create()); return hbaseOpts; } @@ -1398,7 +1404,7 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { } } - protected void applyHBaseOptions(CommandLine in, SqoopOptions out) { + protected void applyHBaseOptions(CommandLine in, SqoopOptions out) throws InvalidOptionsException { if (in.hasOption(HBASE_TABLE_ARG)) { out.setHBaseTable(in.getOptionValue(HBASE_TABLE_ARG)); } @@ -1416,6 +1422,19 @@ public abstract class BaseSqoopTool extends org.apache.sqoop.tool.SqoopTool { if (in.hasOption(HBASE_CREATE_TABLE_ARG)) { out.setCreateHBaseTable(true); } + + if (in.hasOption(HBASE_NULL_INCREMENTAL_MODE_ARG)) { + String nullMode = in.getOptionValue(HBASE_NULL_INCREMENTAL_MODE_ARG); + if ("ignore".equals(nullMode)) { + out.setHbaseNullIncrementalMode(SqoopOptions.HBaseNullIncrementalMode.Ignore); + } else if ("delete".equals(nullMode)) { + out.setHbaseNullIncrementalMode(SqoopOptions.HBaseNullIncrementalMode.Delete); + } else { + throw new InvalidOptionsException("Unknown HBase null incremental mode: " + + nullMode + ". Use 'ignore' or 'delete'." + + HELP_STR); + } + } } protected void applyValidationOptions(CommandLine in, SqoopOptions out) http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/test/org/apache/sqoop/TestSqoopOptions.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/TestSqoopOptions.java b/src/test/org/apache/sqoop/TestSqoopOptions.java index 16901ca..bb7c20d 100644 --- a/src/test/org/apache/sqoop/TestSqoopOptions.java +++ b/src/test/org/apache/sqoop/TestSqoopOptions.java @@ -89,6 +89,7 @@ public class TestSqoopOptions { excludedFieldsFromClone.add("updateMode"); excludedFieldsFromClone.add("layout"); excludedFieldsFromClone.add("activeSqoopTool"); + excludedFieldsFromClone.add("hbaseNullIncrementalMode"); } @After http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/test/org/apache/sqoop/hbase/HBaseImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/hbase/HBaseImportTest.java b/src/test/org/apache/sqoop/hbase/HBaseImportTest.java index 2e73cf3..58c21ce 100644 --- a/src/test/org/apache/sqoop/hbase/HBaseImportTest.java +++ b/src/test/org/apache/sqoop/hbase/HBaseImportTest.java @@ -72,24 +72,6 @@ public class HBaseImportTest extends HBaseTestCase { } @Test - public void testOverwriteNullColumnsSucceeds() throws IOException { - // Test that we can create a table and then import immediately - // back on top of it without problem and then update with null to validate - String [] argv = getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null); - String [] types = { "INT", "INT", "INT", "DATETIME" }; - String [] vals = { "0", "1", "1", "'2017-03-20'" }; - createTableWithColTypes(types, vals); - runImport(argv); - verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), "1"); - // Run a second time. - argv = getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, "DATA_COL3", "2017-03-24 01:01:01.0", null); - vals = new String[] { "0", "1", null, "'2017-03-25'" }; - updateTable(types, vals); - runImport(argv); - verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), null); - } - - @Test public void testAppendWithTimestampSucceeds() throws IOException { // Test that we can create a table and then import multiple rows // validate for append scenario with time stamp @@ -100,7 +82,7 @@ public class HBaseImportTest extends HBaseTestCase { runImport(argv); verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1"); // Run a second time. - argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null); + argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null, "ignore"); vals = new String[] { "1", "2", "3", "'2017-06-15'" }; insertIntoTable(types, vals); runImport(argv); @@ -118,7 +100,7 @@ public class HBaseImportTest extends HBaseTestCase { runImport(argv); verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1"); // Run a second time. - argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3"); + argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3", "ignore"); vals = new String[] { "1", "2", "3", "'2017-06-15'" }; insertIntoTable(types, vals); runImport(argv); @@ -126,6 +108,49 @@ public class HBaseImportTest extends HBaseTestCase { } @Test + public void testNullIncrementalModeIgnore() throws Exception { + // Latest value retained with 'ignore' mode + runInsertUpdateUpdateDeleteAndExpectValue("ignore", "2"); + } + + @Test + public void testNullIncrementalModeDelete() throws Exception { + // All previous values deleted with 'delete' mode + runInsertUpdateUpdateDeleteAndExpectValue("delete", null); + } + + /** + * Does the following + * - create HBase table + * - insert value "1" + * - update value to "2" + * - update value to null + * - asserts its value equals expectedValue + * + * @param nullMode hbase-null-incremental-mode to use ('ignore' or 'delete') + * @param expectedValue expected value in the end + * @throws Exception + */ + private void runInsertUpdateUpdateDeleteAndExpectValue(String nullMode, String expectedValue) throws Exception { + // Create table and import with initial values + String [] types = { "INT", "INT", "DATETIME" }; + createTableWithColTypes(types, new String[] { "0", "1", "'2017-03-20'" }); + runImport(getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null)); + verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), "1"); + + // Run a second time after updating. + updateTable(types, new String[] { "0", "2", "'2017-03-25'" }); + runImport(getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, getColName(2), "2017-03-24 01:01:01.0", null, nullMode)); + verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), "2"); + + // Run third time after deleting (setting to null) + updateTable(types, new String[] { "0", null, "'2017-03-28'" }); + runImport(getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, getColName(2), "2017-03-26 01:01:01.0", null, nullMode)); + verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(1), expectedValue); + } + + + @Test public void testExitFailure() throws IOException { String [] types = { "INT", "INT", "INT" }; String [] vals = { "0", "42", "43" }; http://git-wip-us.apache.org/repos/asf/sqoop/blob/69463f0b/src/test/org/apache/sqoop/hbase/HBaseTestCase.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/hbase/HBaseTestCase.java b/src/test/org/apache/sqoop/hbase/HBaseTestCase.java index 98f8698..f96b658 100644 --- a/src/test/org/apache/sqoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/sqoop/hbase/HBaseTestCase.java @@ -138,7 +138,7 @@ public abstract class HBaseTestCase extends ImportJobTestCase { */ protected String [] getIncrementalArgv(boolean includeHadoopFlags, String hbaseTable, String hbaseColFam, boolean hbaseCreate, - String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn) { + String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn, String nullMode) { String[] argsStrArray = getArgv(includeHadoopFlags, hbaseTable, hbaseColFam, hbaseCreate, queryStr); List<String> args = new ArrayList<String>(Arrays.asList(argsStrArray)); @@ -161,6 +161,14 @@ public abstract class HBaseTestCase extends ImportJobTestCase { args.add("--last-value"); args.add(checkValue); } + + // Set --hbase-null-incremental-mode (default is 'ignore') + if (nullMode == null) { + nullMode = "ignore"; + } + args.add("--hbase-null-incremental-mode"); + args.add(nullMode); + return args.toArray(new String[0]); }