Author: stack
Date: Thu May 19 05:48:37 2011
New Revision: 1124542
URL: http://svn.apache.org/viewvc?rev=1124542&view=rev
Log:
HBASE-3880 Make mapper function in ImportTSV plug-able
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1124542&r1=1124541&r2=1124542&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu May 19 05:48:37 2011
@@ -219,6 +219,7 @@ Release 0.91.0 - Unreleased
HBASE-3797 StoreFile Level Compaction Locking
HBASE-1476 Multithreaded Compactions
HBASE-3877 Determine Proper Defaults for Compaction ThreadPools
+ HBASE-3880 Make mapper function in ImportTSV plug-able (Bill Graham)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1124542&r1=1124541&r2=1124542&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
Thu May 19 05:48:37 2011
@@ -28,17 +28,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import
org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -59,12 +53,14 @@ import com.google.common.collect.Lists;
public class ImportTsv {
final static String NAME = "importtsv";
+ final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
final static String COLUMNS_CONF_KEY = "importtsv.columns";
final static String SEPARATOR_CONF_KEY = "importtsv.separator";
final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
final static String DEFAULT_SEPARATOR = "\t";
+ final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
static class TsvParser {
/**
@@ -76,7 +72,7 @@ public class ImportTsv {
private final byte separatorByte;
private int rowKeyColumnIndex;
-
+
public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
/**
@@ -93,7 +89,7 @@ public class ImportTsv {
// Configure columns
ArrayList<String> columnStrings = Lists.newArrayList(
Splitter.on(',').trimResults().split(columnsSpecification));
-
+
families = new byte[columnStrings.size()][];
qualifiers = new byte[columnStrings.size()][];
@@ -113,7 +109,7 @@ public class ImportTsv {
}
}
}
-
+
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
@@ -123,7 +119,7 @@ public class ImportTsv {
public byte[] getQualifier(int idx) {
return qualifiers[idx];
}
-
+
public ParsedLine parse(byte[] lineBytes, int length)
throws BadTsvLineException {
// Enumerate separator offsets
@@ -146,16 +142,16 @@ public class ImportTsv {
}
return new ParsedLine(tabOffsets, lineBytes);
}
-
+
class ParsedLine {
private final ArrayList<Integer> tabOffsets;
private byte[] lineBytes;
-
+
ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
this.tabOffsets = tabOffsets;
this.lineBytes = lineBytes;
}
-
+
public int getRowKeyOffset() {
return getColumnOffset(rowKeyColumnIndex);
}
@@ -167,7 +163,7 @@ public class ImportTsv {
return tabOffsets.get(idx - 1) + 1;
else
return 0;
- }
+ }
public int getColumnLength(int idx) {
return tabOffsets.get(idx) - getColumnOffset(idx);
}
@@ -178,7 +174,7 @@ public class ImportTsv {
return lineBytes;
}
}
-
+
public static class BadTsvLineException extends Exception {
public BadTsvLineException(String err) {
super(err);
@@ -186,103 +182,6 @@ public class ImportTsv {
private static final long serialVersionUID = 1L;
}
}
-
- /**
- * Write table content out to files in hdfs.
- */
- static class TsvImporter
- extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
- {
-
- /** Timestamp for all inserted rows */
- private long ts;
-
- /** Should skip bad lines */
- private boolean skipBadLines;
- private Counter badLineCount;
-
- private TsvParser parser;
-
- @Override
- protected void setup(Context context) {
- Configuration conf = context.getConfiguration();
-
- // If a custom separator has been used,
- // decode it back from Base64 encoding.
- String separator = conf.get(SEPARATOR_CONF_KEY);
- if (separator == null) {
- separator = DEFAULT_SEPARATOR;
- } else {
- separator = new String(Base64.decode(separator));
- }
-
- parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
- separator);
- if (parser.getRowKeyColumnIndex() == -1) {
- throw new RuntimeException("No row key column specified");
- }
- ts = conf.getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
-
- skipBadLines = context.getConfiguration().getBoolean(
- SKIP_LINES_CONF_KEY, true);
- badLineCount = context.getCounter("ImportTsv", "Bad Lines");
- }
-
- /**
- * Convert a line of TSV text into an HBase table row.
- */
- @Override
- public void map(LongWritable offset, Text value,
- Context context)
- throws IOException {
- byte[] lineBytes = value.getBytes();
-
- try {
- TsvParser.ParsedLine parsed = parser.parse(
- lineBytes, value.getLength());
- ImmutableBytesWritable rowKey =
- new ImmutableBytesWritable(lineBytes,
- parsed.getRowKeyOffset(),
- parsed.getRowKeyLength());
-
- Put put = new Put(rowKey.copyBytes());
- for (int i = 0; i < parsed.getColumnCount(); i++) {
- if (i == parser.getRowKeyColumnIndex()) continue;
- KeyValue kv = new KeyValue(
- lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
- parser.getFamily(i), 0, parser.getFamily(i).length,
- parser.getQualifier(i), 0, parser.getQualifier(i).length,
- ts,
- KeyValue.Type.Put,
- lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
- put.add(kv);
- }
- context.write(rowKey, put);
- } catch (BadTsvLineException badLine) {
- if (skipBadLines) {
- System.err.println(
- "Bad line at offset: " + offset.get() + ":\n" +
- badLine.getMessage());
- badLineCount.increment(1);
- return;
- } else {
- throw new IOException(badLine);
- }
- } catch (IllegalArgumentException e) {
- if (skipBadLines) {
- System.err.println(
- "Bad line at offset: " + offset.get() + ":\n" +
- e.getMessage());
- badLineCount.increment(1);
- return;
- } else {
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
/**
* Sets up the actual job.
@@ -293,7 +192,7 @@ public class ImportTsv {
* @throws IOException When setting up the job fails.
*/
public static Job createSubmittableJob(Configuration conf, String[] args)
- throws IOException {
+ throws IOException, ClassNotFoundException {
// Support non-XML supported characters
// by re-encoding the passed separator as a Base64 string.
@@ -303,13 +202,18 @@ public class ImportTsv {
Base64.encodeBytes(actualSeparator.getBytes())));
}
+ // See if a non-default Mapper was set
+ String mapperClassName = conf.get(MAPPER_CONF_KEY);
+ Class mapperClass = mapperClassName != null ?
+ Class.forName(mapperClassName) : DEFAULT_MAPPER;
+
String tableName = args[0];
Path inputDir = new Path(args[1]);
Job job = new Job(conf, NAME + "_" + tableName);
- job.setJarByClass(TsvImporter.class);
+ job.setJarByClass(mapperClass);
FileInputFormat.setInputPaths(job, inputDir);
job.setInputFormatClass(TextInputFormat.class);
- job.setMapperClass(TsvImporter.class);
+ job.setMapperClass(mapperClass);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
@@ -326,9 +230,9 @@ public class ImportTsv {
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
}
-
+
TableMapReduceUtil.addDependencyJars(job);
- TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
com.google.common.base.Function.class /* Guava used by TsvParser */);
return job;
}
@@ -358,7 +262,8 @@ public class ImportTsv {
"Other options that may be specified with -D include:\n" +
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid
line\n" +
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of
tabs\n" +
- " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified
timestamp for the import\n";
+ " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified
timestamp for the import\n" +
+ " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use
instead of " + DEFAULT_MAPPER.getName() + "\n";
System.err.println(usage);
}
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1124542&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
Thu May 19 05:48:37 2011
@@ -0,0 +1,148 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Write table content out to files in hdfs.
+ */
+public class TsvImporterMapper
+extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
+{
+
+ /** Timestamp for all inserted rows */
+ private long ts;
+
+ /** Column seperator */
+ private String separator;
+
+ /** Should skip bad lines */
+ private boolean skipBadLines;
+ private Counter badLineCount;
+
+ private ImportTsv.TsvParser parser;
+
+ public long getTs() {
+ return ts;
+ }
+
+ public boolean getSkipBadLines() {
+ return skipBadLines;
+ }
+
+ public Counter getBadLineCount() {
+ return badLineCount;
+ }
+
+ public void incrementBadLineCount(int count) {
+ this.badLineCount.increment(count);
+ }
+
+ /**
+ * Handles initializing this class with objects specific to it (i.e., the
parser).
+ * Common initialization that might be leveraged by a subsclass is done in
+ * <code>doSetup</code>. Hence a subclass may choose to override this method
+ * and call <code>doSetup</code> as well before handling it's own custom
params.
+ *
+ * @param context
+ */
+ @Override
+ protected void setup(Context context) {
+ doSetup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
+ separator);
+ if (parser.getRowKeyColumnIndex() == -1) {
+ throw new RuntimeException("No row key column specified");
+ }
+ }
+
+ /**
+ * Handles common parameter initialization that a subclass might want to
leverage.
+ * @param context
+ */
+ protected void doSetup(Context context) {
+ Configuration conf = context.getConfiguration();
+
+ // If a custom separator has been used,
+ // decode it back from Base64 encoding.
+ separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
+ if (separator == null) {
+ separator = ImportTsv.DEFAULT_SEPARATOR;
+ } else {
+ separator = new String(Base64.decode(separator));
+ }
+
+ ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY,
System.currentTimeMillis());
+
+ skipBadLines = context.getConfiguration().getBoolean(
+ ImportTsv.SKIP_LINES_CONF_KEY, true);
+ badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+ }
+
+ /**
+ * Convert a line of TSV text into an HBase table row.
+ */
+ @Override
+ public void map(LongWritable offset, Text value,
+ Context context)
+ throws IOException {
+ byte[] lineBytes = value.getBytes();
+
+ try {
+ ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
+ lineBytes, value.getLength());
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(lineBytes,
+ parsed.getRowKeyOffset(),
+ parsed.getRowKeyLength());
+
+ Put put = new Put(rowKey.copyBytes());
+ for (int i = 0; i < parsed.getColumnCount(); i++) {
+ if (i == parser.getRowKeyColumnIndex()) continue;
+ KeyValue kv = new KeyValue(
+ lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+ parser.getFamily(i), 0, parser.getFamily(i).length,
+ parser.getQualifier(i), 0, parser.getQualifier(i).length,
+ ts,
+ KeyValue.Type.Put,
+ lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
+ put.add(kv);
+ }
+ context.write(rowKey, put);
+ } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
+ if (skipBadLines) {
+ System.err.println(
+ "Bad line at offset: " + offset.get() + ":\n" +
+ badLine.getMessage());
+ incrementBadLineCount(1);
+ return;
+ } else {
+ throw new IOException(badLine);
+ }
+ } catch (IllegalArgumentException e) {
+ if (skipBadLines) {
+ System.err.println(
+ "Bad line at offset: " + offset.get() + ":\n" +
+ e.getMessage());
+ incrementBadLineCount(1);
+ return;
+ } else {
+ throw new IOException(e);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1124542&r1=1124541&r2=1124542&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Thu May 19 05:48:37 2011
@@ -107,11 +107,11 @@ public class TestImportTsv {
parsed.getColumnLength(i)));
}
if (!Iterables.elementsEqual(parsedCols, expected)) {
- fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
+ fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
"Got:" + Joiner.on(",").join(parsedCols));
}
}
-
+
private void assertBytesEquals(byte[] a, byte[] b) {
assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
}
@@ -153,7 +153,7 @@ public class TestImportTsv {
String TABLE_NAME = "TestTable";
String FAMILY = "FAM";
String INPUT_FILE = "InputFile.esv";
-
+
// Prepare the arguments required for the test.
String[] args = new String[] {
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
@@ -162,6 +162,29 @@ public class TestImportTsv {
INPUT_FILE
};
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
+ }
+
+ @Test
+ public void testMROnTableWithCustomMapper()
+ throws Exception {
+ String TABLE_NAME = "TestTable";
+ String FAMILY = "FAM";
+ String INPUT_FILE = "InputFile2.esv";
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.MAPPER_CONF_KEY +
"=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
+ TABLE_NAME,
+ INPUT_FILE
+ };
+
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
+ }
+
+ private void doMROnTableTest(String inputFile, String family, String
tableName,
+ String[] args, int valueMultiplier) throws
Exception {
+
// Cluster
HBaseTestingUtility htu1 = new HBaseTestingUtility();
@@ -172,15 +195,15 @@ public class TestImportTsv {
args = opts.getRemainingArgs();
try {
-
+
FileSystem fs = FileSystem.get(conf);
- FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
+ FSDataOutputStream op = fs.create(new Path(inputFile), true);
String line = "KEY\u001bVALUE1\u001bVALUE2\n";
op.write(line.getBytes(HConstants.UTF8_ENCODING));
op.close();
- final byte[] FAM = Bytes.toBytes(FAMILY);
- final byte[] TAB = Bytes.toBytes(TABLE_NAME);
+ final byte[] FAM = Bytes.toBytes(family);
+ final byte[] TAB = Bytes.toBytes(tableName);
final byte[] QA = Bytes.toBytes("A");
final byte[] QB = Bytes.toBytes("B");
@@ -210,9 +233,9 @@ public class TestImportTsv {
assertEquals(toU8Str(kvs.get(1).getRow()),
toU8Str(Bytes.toBytes("KEY")));
assertEquals(toU8Str(kvs.get(0).getValue()),
- toU8Str(Bytes.toBytes("VALUE1")));
+ toU8Str(Bytes.toBytes("VALUE" + valueMultiplier)));
assertEquals(toU8Str(kvs.get(1).getValue()),
- toU8Str(Bytes.toBytes("VALUE2")));
+ toU8Str(Bytes.toBytes("VALUE" + 2*valueMultiplier)));
// Only one result set is expected, so let it loop.
}
verified = true;
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java?rev=1124542&view=auto
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
(added)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
Thu May 19 05:48:37 2011
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+
+import java.io.IOException;
+
+/**
+ * Dummy mapper used for unit tests to verify that the mapper can be injected.
+ * This approach would be used if a custom transformation needed to be done
after
+ * reading the input data before writing it to HFiles.
+ */
+public class TsvImporterCustomTestMapper extends TsvImporterMapper {
+
+ @Override
+ protected void setup(Context context) {
+ doSetup(context);
+ }
+
+ /**
+ * Convert a line of TSV text into an HBase table row after transforming the
+ * values by multiplying them by 3.
+ */
+ @Override
+ public void map(LongWritable offset, Text value, Context context)
+ throws IOException {
+ byte[] family = Bytes.toBytes("FAM");
+ final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
+
+ // do some basic line parsing
+ byte[] lineBytes = value.getBytes();
+ String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
+
+ // create the rowKey and Put
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
+ Put put = new Put(rowKey.copyBytes());
+
+ //The value should look like this: VALUE1 or VALUE2. Let's multiply
+ //the integer by 3
+ for(int i = 1; i < valueTokens.length; i++) {
+ String prefix = valueTokens[i].substring(0, "VALUE".length());
+ String suffix = valueTokens[i].substring("VALUE".length());
+ String newValue = prefix + Integer.parseInt(suffix) * 3;
+
+ KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
+ qualifiers[i-1], Bytes.toBytes(newValue));
+ put.add(kv);
+ }
+
+ try {
+ context.write(rowKey, put);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file