Repository: hbase Updated Branches: refs/heads/branch-1 f9ce069e1 -> 8473fae1d
HBASE-11262 Avoid empty columns while doing bulk-load (Ashish Kumar) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8473fae1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8473fae1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8473fae1 Branch: refs/heads/branch-1 Commit: 8473fae1decd986479d9296c9eb86466df0e3116 Parents: f9ce069 Author: tedyu <[email protected]> Authored: Fri Feb 5 09:01:46 2016 -0800 Committer: tedyu <[email protected]> Committed: Fri Feb 5 09:01:46 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/mapreduce/ImportTsv.java | 2 ++ .../hadoop/hbase/mapreduce/TsvImporterMapper.java | 7 ++++++- .../apache/hadoop/hbase/mapreduce/TestImportTsv.java | 13 +++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8473fae1/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 6fd83e7..6ce1a28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -92,6 +92,7 @@ public class ImportTsv extends Configured implements Tool { // If true, bad lines are logged to stderr. Default: false. public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines"; public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; + public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns"; public final static String COLUMNS_CONF_KEY = "importtsv.columns"; public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator"; @@ -687,6 +688,7 @@ public class ImportTsv extends Configured implements Tool { " table. If table does not exist, it is created but deleted in the end.\n" + " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" + " -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" + + " -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" + " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" + " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" + " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + http://git-wip-us.apache.org/repos/asf/hbase/blob/8473fae1/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index 0891df2..e618943 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -57,6 +57,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> /** Should skip bad lines */ private boolean skipBadLines; + /** Should skip empty columns*/ + private boolean skipEmptyColumns; private Counter badLineCount; private boolean logBadLines; @@ -128,6 +130,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> // configuration. ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); + skipEmptyColumns = context.getConfiguration().getBoolean( + ImportTsv.SKIP_EMPTY_COLUMNS, false); skipBadLines = context.getConfiguration().getBoolean( ImportTsv.SKIP_LINES_CONF_KEY, true); badLineCount = context.getCounter("ImportTsv", "Bad Lines"); @@ -160,7 +164,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> for (int i = 0; i < parsed.getColumnCount(); i++) { if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() - || i == parser.getCellTTLColumnIndex()) { + || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns + && parsed.getColumnLength(i) == 0)) { continue; } populatePut(lineBytes, parsed, put, i); http://git-wip-us.apache.org/repos/asf/hbase/blob/8473fae1/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 40c4b4d..8920da4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -365,6 +365,19 @@ public class TestImportTsv implements Configurable { doMROnTableTest(data, 1, 4); util.deleteTable(table); } + + @Test + public void testSkipEmptyColumns() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); + args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); + // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 + String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; + doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); + util.deleteTable(tn); + } private Tool doMROnTableTest(String data, int valueMultiplier,int expectedKVCount) throws Exception {
