Author: stack
Date: Fri Mar 30 21:44:08 2012
New Revision: 1307629
URL: http://svn.apache.org/viewvc?rev=1307629&view=rev
Log:
HBASE-5564 Bulkload is discarding duplicate records
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1307629&r1=1307628&r2=1307629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
Fri Mar 30 21:44:08 2012
@@ -77,16 +77,7 @@ public class ImportTsv {
private int rowKeyColumnIndex;
- private int maxColumnCount;
-
- // Default value must be negative
- public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
-
- private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
-
- public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
-
- public static String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
+ public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
/**
* @param columnsSpecification the list of columns to parser out, comma
separated.
@@ -103,20 +94,15 @@ public class ImportTsv {
ArrayList<String> columnStrings = Lists.newArrayList(
Splitter.on(',').trimResults().split(columnsSpecification));
- maxColumnCount = columnStrings.size();
- families = new byte[maxColumnCount][];
- qualifiers = new byte[maxColumnCount][];
+ families = new byte[columnStrings.size()][];
+ qualifiers = new byte[columnStrings.size()][];
+
for (int i = 0; i < columnStrings.size(); i++) {
String str = columnStrings.get(i);
if (ROWKEY_COLUMN_SPEC.equals(str)) {
rowKeyColumnIndex = i;
continue;
}
- if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
- timestampKeyColumnIndex = i;
- continue;
- }
-
String[] parts = str.split(":", 2);
if (parts.length == 1) {
families[i] = str.getBytes();
@@ -131,15 +117,6 @@ public class ImportTsv {
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
-
- public boolean hasTimestamp() {
- return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
- }
-
- public int getTimestampKeyColumnIndex() {
- return timestampKeyColumnIndex;
- }
-
public byte[] getFamily(int idx) {
return families[idx];
}
@@ -150,7 +127,7 @@ public class ImportTsv {
public ParsedLine parse(byte[] lineBytes, int length)
throws BadTsvLineException {
// Enumerate separator offsets
- ArrayList<Integer> tabOffsets = new ArrayList<Integer>(maxColumnCount);
+ ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
for (int i = 0; i < length; i++) {
if (lineBytes[i] == separatorByte) {
tabOffsets.add(i);
@@ -162,12 +139,10 @@ public class ImportTsv {
tabOffsets.add(length);
- if (tabOffsets.size() > maxColumnCount) {
+ if (tabOffsets.size() > families.length) {
throw new BadTsvLineException("Excessive columns");
} else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
throw new BadTsvLineException("No row key");
- } else if (hasTimestamp() && tabOffsets.size() <=
getTimestampKeyColumnIndex()) {
- throw new BadTsvLineException("No timestamp");
}
return new ParsedLine(tabOffsets, lineBytes);
}
@@ -187,22 +162,6 @@ public class ImportTsv {
public int getRowKeyLength() {
return getColumnLength(rowKeyColumnIndex);
}
-
- public long getTimestamp(long ts) throws BadTsvLineException {
- // Return ts if HBASE_TS_KEY is not configured in column spec
- if (!hasTimestamp()) {
- return ts;
- }
-
- try {
- return Long.parseLong(Bytes.toString(lineBytes,
getColumnOffset(timestampKeyColumnIndex),
- getColumnLength(timestampKeyColumnIndex)));
- } catch (NumberFormatException nfe) {
- // treat this record as bad record
- throw new BadTsvLineException("Invalid timestamp");
- }
- }
-
public int getColumnOffset(int idx) {
if (idx > 0)
return tabOffsets.get(idx - 1) + 1;
@@ -289,7 +248,7 @@ public class ImportTsv {
if (errorMsg != null && errorMsg.length() > 0) {
System.err.println("ERROR: " + errorMsg);
}
- String usage =
+ String usage =
"Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n"
+
"\n" +
"Imports the given input directory of TSV data into the specified
table.\n" +
@@ -300,11 +259,7 @@ public class ImportTsv {
"column name HBASE_ROW_KEY is used to designate that this column should
be used\n" +
"as the row key for each imported record. You must specify exactly one
column\n" +
"to be the row key, and you must specify a column name for every column
that exists in the\n" +
- "input data. Another special column HBASE_TS_KEY designates that this
column should be\n" +
- "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY
is optional.\n" +
- "You must specify atmost one column as timestamp key for each imported
record.\n" +
- "Record with invalid timestamps (blank, non-numeric) will be treated as
bad record.\n" +
- "Note: if you use this option, then 'importtsv.timestamp' option will be
ignored.\n" +
+ "input data.\n" +
"\n" +
"By default importtsv will load data directly into HBase. To instead
generate\n" +
"HFiles of data to prepare for a bulk data load, pass the option:\n" +
@@ -355,28 +310,12 @@ public class ImportTsv {
System.exit(-1);
}
- // Make sure we have at most one column as the timestamp key
- int tskeysFound=0;
- for (String col : columns) {
- if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) tskeysFound++;
- }
- if (tskeysFound > 1) {
- usage("Must specify at most one column as " +
TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
- System.exit(-1);
- }
-
- // Make sure one or more columns are specified excluding rowkey and
timestamp key
- if (columns.length - (rowkeysFound + tskeysFound) < 1) {
- usage("One or more columns in addition to the row key and
timestamp(optional) are required");
+ // Make sure one or more columns are specified
+ if (columns.length < 2) {
+ usage("One or more columns in addition to the row key are required");
System.exit(-1);
}
- // If timestamp option is not specified, use current system time.
- long timstamp = conf.getLong(TIMESTAMP_CONF_KEY,
System.currentTimeMillis());
-
- // Set it back to replace invalid timestamp (non-numeric) with current
system time
- conf.setLong(TIMESTAMP_CONF_KEY, timstamp);
-
Job job = createSubmittableJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1307629&r1=1307628&r2=1307629&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
Fri Mar 30 21:44:08 2012
@@ -82,7 +82,8 @@ extends Mapper<LongWritable, Text, Immut
Configuration conf = context.getConfiguration();
- parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
separator);
+ parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
+ separator);
if (parser.getRowKeyColumnIndex() == -1) {
throw new RuntimeException("No row key column specified");
}
@@ -104,10 +105,10 @@ extends Mapper<LongWritable, Text, Immut
separator = new String(Base64.decode(separator));
}
- // Should never get 0 as we are setting this to a valid value in job
configuration.
- ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
+ ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY,
System.currentTimeMillis());
- skipBadLines =
context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+ skipBadLines = context.getConfiguration().getBoolean(
+ ImportTsv.SKIP_LINES_CONF_KEY, true);
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
}
@@ -115,22 +116,22 @@ extends Mapper<LongWritable, Text, Immut
* Convert a line of TSV text into an HBase table row.
*/
@Override
- public void map(LongWritable offset, Text value, Context context) throws
IOException {
-
+ public void map(LongWritable offset, Text value,
+ Context context)
+ throws IOException {
byte[] lineBytes = value.getBytes();
try {
- ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes,
value.getLength());
- ImmutableBytesWritable rowKey = new ImmutableBytesWritable(lineBytes,
- parsed.getRowKeyOffset(), parsed.getRowKeyLength());
- // Retrieve timestamp if exists
- ts = parsed.getTimestamp(ts);
+ ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
+ lineBytes, value.getLength());
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(lineBytes,
+ parsed.getRowKeyOffset(),
+ parsed.getRowKeyLength());
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
- if (i == parser.getRowKeyColumnIndex() || i ==
parser.getTimestampKeyColumnIndex()) {
- continue;
- }
+ if (i == parser.getRowKeyColumnIndex()) continue;
KeyValue kv = new KeyValue(
lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length,
@@ -143,7 +144,9 @@ extends Mapper<LongWritable, Text, Immut
context.write(rowKey, put);
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
if (skipBadLines) {
- System.err.println("Bad line at offset: " + offset.get() + ":\n" +
badLine.getMessage());
+ System.err.println(
+ "Bad line at offset: " + offset.get() + ":\n" +
+ badLine.getMessage());
incrementBadLineCount(1);
return;
} else {
@@ -151,7 +154,9 @@ extends Mapper<LongWritable, Text, Immut
}
} catch (IllegalArgumentException e) {
if (skipBadLines) {
- System.err.println("Bad line at offset: " + offset.get() + ":\n" +
e.getMessage());
+ System.err.println(
+ "Bad line at offset: " + offset.get() + ":\n" +
+ e.getMessage());
incrementBadLineCount(1);
return;
} else {
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1307629&r1=1307628&r2=1307629&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
Fri Mar 30 21:44:08 2012
@@ -61,7 +61,6 @@ public class TestImportTsv {
assertNull(parser.getFamily(0));
assertNull(parser.getQualifier(0));
assertEquals(0, parser.getRowKeyColumnIndex());
- assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
assertNull(parser.getFamily(0));
@@ -69,7 +68,6 @@ public class TestImportTsv {
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
assertEquals(0, parser.getRowKeyColumnIndex());
- assertFalse(parser.hasTimestamp());
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
assertNull(parser.getFamily(0));
@@ -79,18 +77,6 @@ public class TestImportTsv {
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
assertEquals(0, parser.getRowKeyColumnIndex());
- assertFalse(parser.hasTimestamp());
-
- parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2",
"\t");
- assertNull(parser.getFamily(0));
- assertNull(parser.getQualifier(0));
- assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
- assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
- assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
- assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
- assertEquals(0, parser.getRowKeyColumnIndex());
- assertTrue(parser.hasTimestamp());
- assertEquals(2, parser.getTimestampKeyColumnIndex());
}
@Test
@@ -103,31 +89,12 @@ public class TestImportTsv {
assertNull(parser.getFamily(2));
assertNull(parser.getQualifier(2));
assertEquals(2, parser.getRowKeyColumnIndex());
- assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX,
parser.getTimestampKeyColumnIndex());
-
+
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
ParsedLine parsed = parser.parse(line, line.length);
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
}
- @Test
- public void testTsvParserWithTimestamp() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,",
"\t");
- assertNull(parser.getFamily(0));
- assertNull(parser.getQualifier(0));
- assertNull(parser.getFamily(1));
- assertNull(parser.getQualifier(1));
- assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
- assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
- assertEquals(0, parser.getRowKeyColumnIndex());
- assertEquals(1, parser.getTimestampKeyColumnIndex());
-
- byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
- ParsedLine parsed = parser.parse(line, line.length);
- assertEquals(1234l, parsed.getTimestamp(-1));
- checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
- }
-
private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
ArrayList<String> parsedCols = new ArrayList<String>();
for (int i = 0; i < parsed.getColumnCount(); i++) {
@@ -153,46 +120,28 @@ public class TestImportTsv {
public void testTsvParserBadTsvLineExcessiveColumns() throws
BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
- parser.parse(line, line.length);
+ ParsedLine parsed = parser.parse(line, line.length);
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("");
- parser.parse(line, line.length);
+ ParsedLine parsed = parser.parse(line, line.length);
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
byte[] line = Bytes.toBytes("key_only");
- parser.parse(line, line.length);
+ ParsedLine parsed = parser.parse(line, line.length);
}
@Test(expected=BadTsvLineException.class)
public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
- parser.parse(line, line.length);
- }
-
- @Test(expected=BadTsvLineException.class)
- public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,",
"\t");
- assertEquals(1, parser.getTimestampKeyColumnIndex());
- byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
ParsedLine parsed = parser.parse(line, line.length);
- assertEquals(-1, parsed.getTimestamp(-1));
- checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
- }
-
- @Test(expected=BadTsvLineException.class)
- public void testTsvParserNoTimestampValue() throws BadTsvLineException {
- TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
- assertEquals(2, parser.getTimestampKeyColumnIndex());
- byte[] line = Bytes.toBytes("rowkey\tval_a");
- parser.parse(line, line.length);
}
@Test
@@ -210,22 +159,7 @@ public class TestImportTsv {
INPUT_FILE
};
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
- }
-
- @Test
- public void testMROnTableWithTimestamp() throws Exception {
- String TABLE_NAME = "TestTable";
- String FAMILY = "FAM";
- String INPUT_FILE = "InputFile1.csv";
-
- // Prepare the arguments required for the test.
- String[] args = new String[] {
- "-D" + ImportTsv.COLUMNS_CONF_KEY +
"=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
- "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
-
- String data = "KEY,1234,VALUE1,VALUE2\n";
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
}
@Test
@@ -237,21 +171,21 @@ public class TestImportTsv {
// Prepare the arguments required for the test.
String[] args = new String[] {
- "-D" + ImportTsv.MAPPER_CONF_KEY + " =
org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
+ "-D" + ImportTsv.MAPPER_CONF_KEY +
"=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
TABLE_NAME,
INPUT_FILE
};
- doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
+ doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
}
- private void doMROnTableTest(String inputFile, String family, String
tableName, String data,
- String[] args, int valueMultiplier) throws Exception {
+ private void doMROnTableTest(String inputFile, String family, String
tableName,
+ String[] args, int valueMultiplier) throws
Exception {
// Cluster
HBaseTestingUtility htu1 = new HBaseTestingUtility();
- htu1.startMiniCluster();
+ MiniHBaseCluster cluster = htu1.startMiniCluster();
htu1.startMiniMapReduceCluster();
GenericOptionsParser opts = new
GenericOptionsParser(htu1.getConfiguration(), args);
@@ -262,14 +196,14 @@ public class TestImportTsv {
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream op = fs.create(new Path(inputFile), true);
- if (data == null) {
- data = "KEY\u001bVALUE1\u001bVALUE2\n";
- }
- op.write(Bytes.toBytes(data));
+ String line = "KEY\u001bVALUE1\u001bVALUE2\n";
+ op.write(line.getBytes(HConstants.UTF8_ENCODING));
op.close();
final byte[] FAM = Bytes.toBytes(family);
final byte[] TAB = Bytes.toBytes(tableName);
+ final byte[] QA = Bytes.toBytes("A");
+ final byte[] QB = Bytes.toBytes("B");
HTableDescriptor desc = new HTableDescriptor(TAB);
desc.addFamily(new HColumnDescriptor(FAM));
@@ -278,7 +212,7 @@ public class TestImportTsv {
Job job = ImportTsv.createSubmittableJob(conf, args);
job.waitForCompletion(false);
assertTrue(job.isSuccessful());
-
+
HTable table = new HTable(new Configuration(conf), TAB);
boolean verified = false;
long pause = conf.getLong("hbase.client.pause", 5 * 1000);
@@ -320,9 +254,9 @@ public class TestImportTsv {
htu1.shutdownMiniCluster();
}
}
-
+
public static String toU8Str(byte[] bytes) throws
UnsupportedEncodingException {
- return Bytes.toString(bytes);
+ return new String(bytes, HConstants.UTF8_ENCODING);
}
@org.junit.Rule