Author: stack
Date: Fri Mar 30 21:44:08 2012
New Revision: 1307629

URL: http://svn.apache.org/viewvc?rev=1307629&view=rev
Log:
HBASE-5564 Bulkload is discarding duplicate records

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
    
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1307629&r1=1307628&r2=1307629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java 
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java 
Fri Mar 30 21:44:08 2012
@@ -77,16 +77,7 @@ public class ImportTsv {
 
     private int rowKeyColumnIndex;
 
-    private int maxColumnCount;
-
-    // Default value must be negative
-    public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
-
-    private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
-
-    public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
-
-    public static String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
+    public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
 
     /**
      * @param columnsSpecification the list of columns to parser out, comma 
separated.
@@ -103,20 +94,15 @@ public class ImportTsv {
       ArrayList<String> columnStrings = Lists.newArrayList(
         Splitter.on(',').trimResults().split(columnsSpecification));
 
-      maxColumnCount = columnStrings.size();
-      families = new byte[maxColumnCount][];
-      qualifiers = new byte[maxColumnCount][];
+      families = new byte[columnStrings.size()][];
+      qualifiers = new byte[columnStrings.size()][];
+
       for (int i = 0; i < columnStrings.size(); i++) {
         String str = columnStrings.get(i);
         if (ROWKEY_COLUMN_SPEC.equals(str)) {
           rowKeyColumnIndex = i;
           continue;
         }
-        if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
-          timestampKeyColumnIndex = i;
-          continue;
-        }
-
         String[] parts = str.split(":", 2);
         if (parts.length == 1) {
           families[i] = str.getBytes();
@@ -131,15 +117,6 @@ public class ImportTsv {
     public int getRowKeyColumnIndex() {
       return rowKeyColumnIndex;
     }
-
-    public boolean hasTimestamp() {
-      return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
-    }
-
-    public int getTimestampKeyColumnIndex() {
-      return timestampKeyColumnIndex;
-    }
-
     public byte[] getFamily(int idx) {
       return families[idx];
     }
@@ -150,7 +127,7 @@ public class ImportTsv {
     public ParsedLine parse(byte[] lineBytes, int length)
     throws BadTsvLineException {
       // Enumerate separator offsets
-      ArrayList<Integer> tabOffsets = new ArrayList<Integer>(maxColumnCount);
+      ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
       for (int i = 0; i < length; i++) {
         if (lineBytes[i] == separatorByte) {
           tabOffsets.add(i);
@@ -162,12 +139,10 @@ public class ImportTsv {
 
       tabOffsets.add(length);
 
-      if (tabOffsets.size() > maxColumnCount) {
+      if (tabOffsets.size() > families.length) {
         throw new BadTsvLineException("Excessive columns");
       } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
         throw new BadTsvLineException("No row key");
-      } else if (hasTimestamp() && tabOffsets.size() <= 
getTimestampKeyColumnIndex()) {
-        throw new BadTsvLineException("No timestamp");
       }
       return new ParsedLine(tabOffsets, lineBytes);
     }
@@ -187,22 +162,6 @@ public class ImportTsv {
       public int getRowKeyLength() {
         return getColumnLength(rowKeyColumnIndex);
       }
-
-      public long getTimestamp(long ts) throws BadTsvLineException {
-        // Return ts if HBASE_TS_KEY is not configured in column spec
-        if (!hasTimestamp()) {
-          return ts;
-        }
-
-        try {
-          return Long.parseLong(Bytes.toString(lineBytes, 
getColumnOffset(timestampKeyColumnIndex),
-              getColumnLength(timestampKeyColumnIndex)));
-        } catch (NumberFormatException nfe) {
-          // treat this record as bad record
-          throw new BadTsvLineException("Invalid timestamp");
-        }
-      }
-
       public int getColumnOffset(int idx) {
         if (idx > 0)
           return tabOffsets.get(idx - 1) + 1;
@@ -289,7 +248,7 @@ public class ImportTsv {
     if (errorMsg != null && errorMsg.length() > 0) {
       System.err.println("ERROR: " + errorMsg);
     }
-    String usage =
+    String usage = 
       "Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n" 
+
       "\n" +
       "Imports the given input directory of TSV data into the specified 
table.\n" +
@@ -300,11 +259,7 @@ public class ImportTsv {
       "column name HBASE_ROW_KEY is used to designate that this column should 
be used\n" +
       "as the row key for each imported record. You must specify exactly one 
column\n" +
       "to be the row key, and you must specify a column name for every column 
that exists in the\n" +
-      "input data. Another special column HBASE_TS_KEY designates that this 
column should be\n" +
-      "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY 
is optional.\n" +
-      "You must specify atmost one column as timestamp key for each imported 
record.\n" +
-      "Record with invalid timestamps (blank, non-numeric) will be treated as 
bad record.\n" +
-      "Note: if you use this option, then 'importtsv.timestamp' option will be 
ignored.\n" +
+      "input data.\n" +
       "\n" +
       "By default importtsv will load data directly into HBase. To instead 
generate\n" +
       "HFiles of data to prepare for a bulk data load, pass the option:\n" +
@@ -355,28 +310,12 @@ public class ImportTsv {
       System.exit(-1);
     }
 
-    // Make sure we have at most one column as the timestamp key
-    int tskeysFound=0;
-    for (String col : columns) {
-      if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) tskeysFound++;
-    }
-    if (tskeysFound > 1) {
-      usage("Must specify at most one column as " + 
TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
-      System.exit(-1);
-    }
-
-    // Make sure one or more columns are specified excluding rowkey and 
timestamp key
-    if (columns.length - (rowkeysFound + tskeysFound) < 1) {
-      usage("One or more columns in addition to the row key and 
timestamp(optional) are required");
+    // Make sure one or more columns are specified
+    if (columns.length < 2) {
+      usage("One or more columns in addition to the row key are required");
       System.exit(-1);
     }
 
-    // If timestamp option is not specified, use current system time.
-    long timstamp = conf.getLong(TIMESTAMP_CONF_KEY, 
System.currentTimeMillis());
-
-    // Set it back to replace invalid timestamp (non-numeric) with current 
system time
-    conf.setLong(TIMESTAMP_CONF_KEY, timstamp);
-
     Job job = createSubmittableJob(conf, otherArgs);
     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }

Modified: 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java?rev=1307629&r1=1307628&r2=1307629&view=diff
==============================================================================
--- 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
 (original)
+++ 
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
 Fri Mar 30 21:44:08 2012
@@ -82,7 +82,8 @@ extends Mapper<LongWritable, Text, Immut
 
     Configuration conf = context.getConfiguration();
 
-    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), 
separator);
+    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
+                           separator);
     if (parser.getRowKeyColumnIndex() == -1) {
       throw new RuntimeException("No row key column specified");
     }
@@ -104,10 +105,10 @@ extends Mapper<LongWritable, Text, Immut
       separator = new String(Base64.decode(separator));
     }
 
-    // Should never get 0 as we are setting this to a valid value in job 
configuration.
-    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
+    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 
System.currentTimeMillis());
 
-    skipBadLines = 
context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+    skipBadLines = context.getConfiguration().getBoolean(
+        ImportTsv.SKIP_LINES_CONF_KEY, true);
     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
   }
 
@@ -115,22 +116,22 @@ extends Mapper<LongWritable, Text, Immut
    * Convert a line of TSV text into an HBase table row.
    */
   @Override
-  public void map(LongWritable offset, Text value, Context context) throws 
IOException {
-
+  public void map(LongWritable offset, Text value,
+    Context context)
+  throws IOException {
     byte[] lineBytes = value.getBytes();
 
     try {
-      ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, 
value.getLength());
-      ImmutableBytesWritable rowKey = new ImmutableBytesWritable(lineBytes,
-          parsed.getRowKeyOffset(), parsed.getRowKeyLength());
-      // Retrieve timestamp if exists
-      ts = parsed.getTimestamp(ts);
+      ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
+          lineBytes, value.getLength());
+      ImmutableBytesWritable rowKey =
+        new ImmutableBytesWritable(lineBytes,
+            parsed.getRowKeyOffset(),
+            parsed.getRowKeyLength());
 
       Put put = new Put(rowKey.copyBytes());
       for (int i = 0; i < parsed.getColumnCount(); i++) {
-        if (i == parser.getRowKeyColumnIndex() || i == 
parser.getTimestampKeyColumnIndex()) {
-          continue;
-        }
+        if (i == parser.getRowKeyColumnIndex()) continue;
         KeyValue kv = new KeyValue(
             lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
             parser.getFamily(i), 0, parser.getFamily(i).length,
@@ -143,7 +144,9 @@ extends Mapper<LongWritable, Text, Immut
       context.write(rowKey, put);
     } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
       if (skipBadLines) {
-        System.err.println("Bad line at offset: " + offset.get() + ":\n" + 
badLine.getMessage());
+        System.err.println(
+            "Bad line at offset: " + offset.get() + ":\n" +
+            badLine.getMessage());
         incrementBadLineCount(1);
         return;
       } else {
@@ -151,7 +154,9 @@ extends Mapper<LongWritable, Text, Immut
       }
     } catch (IllegalArgumentException e) {
       if (skipBadLines) {
-        System.err.println("Bad line at offset: " + offset.get() + ":\n" + 
e.getMessage());
+        System.err.println(
+            "Bad line at offset: " + offset.get() + ":\n" +
+            e.getMessage());
         incrementBadLineCount(1);
         return;
       } else {

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java?rev=1307629&r1=1307628&r2=1307629&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java 
(original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java 
Fri Mar 30 21:44:08 2012
@@ -61,7 +61,6 @@ public class TestImportTsv {
     assertNull(parser.getFamily(0));
     assertNull(parser.getQualifier(0));
     assertEquals(0, parser.getRowKeyColumnIndex());
-    assertFalse(parser.hasTimestamp());
 
     parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
     assertNull(parser.getFamily(0));
@@ -69,7 +68,6 @@ public class TestImportTsv {
     assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
     assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
     assertEquals(0, parser.getRowKeyColumnIndex());
-    assertFalse(parser.hasTimestamp());
 
     parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
     assertNull(parser.getFamily(0));
@@ -79,18 +77,6 @@ public class TestImportTsv {
     assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
     assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
     assertEquals(0, parser.getRowKeyColumnIndex());
-    assertFalse(parser.hasTimestamp());
-
-    parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", 
"\t");
-    assertNull(parser.getFamily(0));
-    assertNull(parser.getQualifier(0));
-    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
-    assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
-    assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
-    assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
-    assertEquals(0, parser.getRowKeyColumnIndex());
-    assertTrue(parser.hasTimestamp());
-    assertEquals(2, parser.getTimestampKeyColumnIndex());
   }
 
   @Test
@@ -103,31 +89,12 @@ public class TestImportTsv {
     assertNull(parser.getFamily(2));
     assertNull(parser.getQualifier(2));
     assertEquals(2, parser.getRowKeyColumnIndex());
-    assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, 
parser.getTimestampKeyColumnIndex());
-
+    
     byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
     ParsedLine parsed = parser.parse(line, line.length);
     checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
   }
 
-  @Test
-  public void testTsvParserWithTimestamp() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", 
"\t");
-    assertNull(parser.getFamily(0));
-    assertNull(parser.getQualifier(0));
-    assertNull(parser.getFamily(1));
-    assertNull(parser.getQualifier(1));
-    assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
-    assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
-    assertEquals(0, parser.getRowKeyColumnIndex());
-    assertEquals(1, parser.getTimestampKeyColumnIndex());
-
-    byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
-    ParsedLine parsed = parser.parse(line, line.length);
-    assertEquals(1234l, parsed.getTimestamp(-1));
-    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
-  }
-
   private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
     ArrayList<String> parsedCols = new ArrayList<String>();
     for (int i = 0; i < parsed.getColumnCount(); i++) {
@@ -153,46 +120,28 @@ public class TestImportTsv {
   public void testTsvParserBadTsvLineExcessiveColumns() throws 
BadTsvLineException {
     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
     byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
-    parser.parse(line, line.length);
+    ParsedLine parsed = parser.parse(line, line.length);
   }
 
   @Test(expected=BadTsvLineException.class)
   public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
     byte[] line = Bytes.toBytes("");
-    parser.parse(line, line.length);
+    ParsedLine parsed = parser.parse(line, line.length);
   }
 
   @Test(expected=BadTsvLineException.class)
   public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
     byte[] line = Bytes.toBytes("key_only");
-    parser.parse(line, line.length);
+    ParsedLine parsed = parser.parse(line, line.length);
   }
 
   @Test(expected=BadTsvLineException.class)
   public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
     TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
     byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
-    parser.parse(line, line.length);
-  }
-
-  @Test(expected=BadTsvLineException.class)
-  public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", 
"\t");
-    assertEquals(1, parser.getTimestampKeyColumnIndex());
-    byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
     ParsedLine parsed = parser.parse(line, line.length);
-    assertEquals(-1, parsed.getTimestamp(-1));
-    checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
-  }
-
-  @Test(expected=BadTsvLineException.class)
-  public void testTsvParserNoTimestampValue() throws BadTsvLineException {
-    TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
-    assertEquals(2, parser.getTimestampKeyColumnIndex());
-    byte[] line = Bytes.toBytes("rowkey\tval_a");
-    parser.parse(line, line.length);
   }
 
   @Test
@@ -210,22 +159,7 @@ public class TestImportTsv {
         INPUT_FILE
     };
 
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
-  }
-
-  @Test
-  public void testMROnTableWithTimestamp() throws Exception {
-    String TABLE_NAME = "TestTable";
-    String FAMILY = "FAM";
-    String INPUT_FILE = "InputFile1.csv";
-
-    // Prepare the arguments required for the test.
-    String[] args = new String[] {
-        "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
-        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
-
-    String data = "KEY,1234,VALUE1,VALUE2\n";
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
   }
 
   @Test
@@ -237,21 +171,21 @@ public class TestImportTsv {
 
     // Prepare the arguments required for the test.
     String[] args = new String[] {
-        "-D" + ImportTsv.MAPPER_CONF_KEY + " = 
org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
+        "-D" + ImportTsv.MAPPER_CONF_KEY + 
"=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper",
         TABLE_NAME,
         INPUT_FILE
     };
 
-    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
+    doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
   }
 
-  private void doMROnTableTest(String inputFile, String family, String 
tableName, String data,
-      String[] args, int valueMultiplier) throws Exception {
+  private void doMROnTableTest(String inputFile, String family, String 
tableName,
+                               String[] args, int valueMultiplier) throws 
Exception {
 
     // Cluster
     HBaseTestingUtility htu1 = new HBaseTestingUtility();
 
-    htu1.startMiniCluster();
+    MiniHBaseCluster cluster = htu1.startMiniCluster();
     htu1.startMiniMapReduceCluster();
 
     GenericOptionsParser opts = new 
GenericOptionsParser(htu1.getConfiguration(), args);
@@ -262,14 +196,14 @@ public class TestImportTsv {
 
       FileSystem fs = FileSystem.get(conf);
       FSDataOutputStream op = fs.create(new Path(inputFile), true);
-      if (data == null) {
-        data = "KEY\u001bVALUE1\u001bVALUE2\n";
-      }
-      op.write(Bytes.toBytes(data));
+      String line = "KEY\u001bVALUE1\u001bVALUE2\n";
+      op.write(line.getBytes(HConstants.UTF8_ENCODING));
       op.close();
 
       final byte[] FAM = Bytes.toBytes(family);
       final byte[] TAB = Bytes.toBytes(tableName);
+      final byte[] QA = Bytes.toBytes("A");
+      final byte[] QB = Bytes.toBytes("B");
 
       HTableDescriptor desc = new HTableDescriptor(TAB);
       desc.addFamily(new HColumnDescriptor(FAM));
@@ -278,7 +212,7 @@ public class TestImportTsv {
       Job job = ImportTsv.createSubmittableJob(conf, args);
       job.waitForCompletion(false);
       assertTrue(job.isSuccessful());
-
+      
       HTable table = new HTable(new Configuration(conf), TAB);
       boolean verified = false;
       long pause = conf.getLong("hbase.client.pause", 5 * 1000);
@@ -320,9 +254,9 @@ public class TestImportTsv {
       htu1.shutdownMiniCluster();
     }
   }
-
+  
   public static String toU8Str(byte[] bytes) throws 
UnsupportedEncodingException {
-    return Bytes.toString(bytes);
+    return new String(bytes, HConstants.UTF8_ENCODING);
   }
 
   @org.junit.Rule


Reply via email to