http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
new file mode 100644
index 0000000..91d2696
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -0,0 +1,726 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.LauncherSecurityManager;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests the table import and table export MR job functionality
+ */
+@Category({VerySlowMapReduceTests.class, MediumTests.class})
+public class TestImportExport {
+  private static final Log LOG = LogFactory.getLog(TestImportExport.class);
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1");
+  private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2");
+  private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3");
+  private static final String FAMILYA_STRING = "a";
+  private static final String FAMILYB_STRING = "b";
+  private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
+  private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
+  private static final byte[] QUAL = Bytes.toBytes("q");
+  private static final String OUTPUT_DIR = "outputdir";
+  private static String FQ_OUTPUT_DIR;
+  private static final String EXPORT_BATCH_SIZE = "100";
+
+  private static long now = System.currentTimeMillis();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // Up the handlers; this test needs more than usual.
+    
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
 10);
+    UTIL.startMiniCluster();
+    FQ_OUTPUT_DIR =
+      new 
Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Rule
+  public final TestName name = new TestName();
+
+  @Before
+  public void announce() {
+    LOG.info("Running " + name.getMethodName());
+  }
+
+  @Before
+  @After
+  public void cleanup() throws Exception {
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    fs.delete(new Path(OUTPUT_DIR), true);
+  }
+
+  /**
+   * Runs an export job with the specified command line args
+   * @param args
+   * @return true if job completed successfully
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  boolean runExport(String[] args) throws Exception {
+    // need to make a copy of the configuration because to make sure different 
temp dirs are used.
+    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), 
new Export(), args);
+    return status == 0;
+  }
+
+  /**
+   * Runs an import job with the specified command line args
+   * @param args
+   * @return true if job completed successfully
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  boolean runImport(String[] args) throws Exception {
+    // need to make a copy of the configuration because to make sure different 
temp dirs are used.
+    int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), 
new Import(), args);
+    return status == 0;
+  }
+
+  /**
+   * Test simple replication case with column mapping
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleCase() throws Exception {
+    try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), 
FAMILYA, 3);) {
+      Put p = new Put(ROW1);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+      t.put(p);
+      p = new Put(ROW2);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+      t.put(p);
+      p = new Put(ROW3);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+      t.put(p);
+    }
+
+      String[] args = new String[] {
+          // Only export row1 & row2.
+          "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1",
+          "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3",
+          name.getMethodName(),
+          FQ_OUTPUT_DIR,
+          "1000", // max number of key versions per key to export
+      };
+      assertTrue(runExport(args));
+
+      final String IMPORT_TABLE = name.getMethodName() + "import";
+      try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), 
FAMILYB, 3);) {
+        args = new String[] {
+            "-D" + Import.CF_RENAME_PROP + 
"="+FAMILYA_STRING+":"+FAMILYB_STRING,
+            IMPORT_TABLE,
+            FQ_OUTPUT_DIR
+        };
+        assertTrue(runImport(args));
+
+        Get g = new Get(ROW1);
+        g.setMaxVersions();
+        Result r = t.get(g);
+        assertEquals(3, r.size());
+        g = new Get(ROW2);
+        g.setMaxVersions();
+        r = t.get(g);
+        assertEquals(3, r.size());
+        g = new Get(ROW3);
+        r = t.get(g);
+        assertEquals(0, r.size());
+      }
+  }
+
+  /**
+   * Test export hbase:meta table
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMetaExport() throws Exception {
+    String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
+    String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" 
};
+    assertTrue(runExport(args));
+  }
+
+  /**
+   * Test import data from 0.94 exported file
+   * @throws Exception
+   */
+  @Test
+  public void testImport94Table() throws Exception {
+    final String name = "exportedTableIn94Format";
+    URL url = TestImportExport.class.getResource(name);
+    File f = new File(url.toURI());
+    if (!f.exists()) {
+      LOG.warn("FAILED TO FIND " + f + "; skipping out on test");
+      return;
+    }
+    assertTrue(f.exists());
+    LOG.info("FILE=" + f);
+    Path importPath = new Path(f.toURI());
+    FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+    fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + 
name));
+    String IMPORT_TABLE = name;
+    try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), 
Bytes.toBytes("f1"), 3);) {
+      String[] args = new String[] {
+              "-Dhbase.import.version=0.94" ,
+              IMPORT_TABLE, FQ_OUTPUT_DIR
+      };
+      assertTrue(runImport(args));
+      /* exportedTableIn94Format contains 5 rows
+      ROW         COLUMN+CELL
+      r1          column=f1:c1, timestamp=1383766761171, value=val1
+      r2          column=f1:c1, timestamp=1383766771642, value=val2
+      r3          column=f1:c1, timestamp=1383766777615, value=val3
+      r4          column=f1:c1, timestamp=1383766785146, value=val4
+      r5          column=f1:c1, timestamp=1383766791506, value=val5
+      */
+     assertEquals(5, UTIL.countRows(t));
+    }
+  }
+
+  /**
+   * Test export scanner batching
+   */
+   @Test
+   public void testExportScannerBatching() throws Exception {
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(name.getMethodName()));
+    desc.addFamily(new HColumnDescriptor(FAMILYA)
+        .setMaxVersions(1)
+    );
+    UTIL.getAdmin().createTable(desc);
+    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
+
+      Put p = new Put(ROW1);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
+      t.put(p);
+
+      String[] args = new String[] {
+          "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added 
scanner batching arg.
+          name.getMethodName(),
+          FQ_OUTPUT_DIR
+      };
+      assertTrue(runExport(args));
+
+      FileSystem fs = FileSystem.get(UTIL.getConfiguration());
+      fs.delete(new Path(FQ_OUTPUT_DIR), true);
+    }
+  }
+
+  @Test
+  public void testWithDeletes() throws Exception {
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(name.getMethodName()));
+    desc.addFamily(new HColumnDescriptor(FAMILYA)
+        .setMaxVersions(5)
+        .setKeepDeletedCells(KeepDeletedCells.TRUE)
+    );
+    UTIL.getAdmin().createTable(desc);
+    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
+
+      Put p = new Put(ROW1);
+      p.addColumn(FAMILYA, QUAL, now, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 3, QUAL);
+      p.addColumn(FAMILYA, QUAL, now + 4, QUAL);
+      t.put(p);
+
+      Delete d = new Delete(ROW1, now+3);
+      t.delete(d);
+      d = new Delete(ROW1);
+      d.addColumns(FAMILYA, QUAL, now+2);
+      t.delete(d);
+    }
+
+    String[] args = new String[] {
+        "-D" + Export.RAW_SCAN + "=true",
+        name.getMethodName(),
+        FQ_OUTPUT_DIR,
+        "1000", // max number of key versions per key to export
+    };
+    assertTrue(runExport(args));
+
+    final String IMPORT_TABLE = name.getMethodName() + "import";
+    desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
+    desc.addFamily(new HColumnDescriptor(FAMILYA)
+        .setMaxVersions(5)
+        .setKeepDeletedCells(KeepDeletedCells.TRUE)
+    );
+    UTIL.getAdmin().createTable(desc);
+    try (Table t = UTIL.getConnection().getTable(desc.getTableName());) {
+      args = new String[] {
+          IMPORT_TABLE,
+          FQ_OUTPUT_DIR
+      };
+      assertTrue(runImport(args));
+
+      Scan s = new Scan();
+      s.setMaxVersions();
+      s.setRaw(true);
+      ResultScanner scanner = t.getScanner(s);
+      Result r = scanner.next();
+      Cell[] res = r.rawCells();
+      assertTrue(CellUtil.isDeleteFamily(res[0]));
+      assertEquals(now+4, res[1].getTimestamp());
+      assertEquals(now+3, res[2].getTimestamp());
+      assertTrue(CellUtil.isDelete(res[3]));
+      assertEquals(now+2, res[4].getTimestamp());
+      assertEquals(now+1, res[5].getTimestamp());
+      assertEquals(now, res[6].getTimestamp());
+    }
+  }
+
+
+  @Test
+  public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws 
Exception {
+    final TableName exportTable = TableName.valueOf(name.getMethodName());
+    HTableDescriptor desc = new HTableDescriptor(exportTable);
+    desc.addFamily(new HColumnDescriptor(FAMILYA)
+        .setMaxVersions(5)
+        .setKeepDeletedCells(KeepDeletedCells.TRUE)
+    );
+    UTIL.getAdmin().createTable(desc);
+
+    Table exportT = UTIL.getConnection().getTable(exportTable);
+
+    //Add first version of QUAL
+    Put p = new Put(ROW1);
+    p.addColumn(FAMILYA, QUAL, now, QUAL);
+    exportT.put(p);
+
+    //Add Delete family marker
+    Delete d = new Delete(ROW1, now+3);
+    exportT.delete(d);
+
+    //Add second version of QUAL
+    p = new Put(ROW1);
+    p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes());
+    exportT.put(p);
+
+    //Add second Delete family marker
+    d = new Delete(ROW1, now+7);
+    exportT.delete(d);
+
+
+    String[] args = new String[] {
+        "-D" + Export.RAW_SCAN + "=true", exportTable.getNameAsString(),
+        FQ_OUTPUT_DIR,
+        "1000", // max number of key versions per key to export
+    };
+    assertTrue(runExport(args));
+
+    final String importTable = name.getMethodName() + "import";
+    desc = new HTableDescriptor(TableName.valueOf(importTable));
+    desc.addFamily(new HColumnDescriptor(FAMILYA)
+        .setMaxVersions(5)
+        .setKeepDeletedCells(KeepDeletedCells.TRUE)
+    );
+    UTIL.getAdmin().createTable(desc);
+
+    Table importT = 
UTIL.getConnection().getTable(TableName.valueOf(importTable));
+    args = new String[] {
+        importTable,
+        FQ_OUTPUT_DIR
+    };
+    assertTrue(runImport(args));
+
+    Scan s = new Scan();
+    s.setMaxVersions();
+    s.setRaw(true);
+
+    ResultScanner importedTScanner = importT.getScanner(s);
+    Result importedTResult = importedTScanner.next();
+
+    ResultScanner exportedTScanner = exportT.getScanner(s);
+    Result  exportedTResult =  exportedTScanner.next();
+    try {
+      Result.compareResults(exportedTResult, importedTResult);
+    } catch (Exception e) {
+      fail("Original and imported tables data comparision failed with 
error:"+e.getMessage());
+    } finally {
+      exportT.close();
+      importT.close();
+    }
+  }
+
+  /**
+   * Create a simple table, run an Export Job on it, Import with filtering on, 
 verify counts,
+   * attempt with invalid values.
+   */
+  @Test
+  public void testWithFilter() throws Exception {
+    // Create simple table to export
+    HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(name.getMethodName()));
+    desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+    UTIL.getAdmin().createTable(desc);
+    Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
+
+    Put p1 = new Put(ROW1);
+    p1.addColumn(FAMILYA, QUAL, now, QUAL);
+    p1.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+    p1.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+    p1.addColumn(FAMILYA, QUAL, now + 3, QUAL);
+    p1.addColumn(FAMILYA, QUAL, now + 4, QUAL);
+
+    // Having another row would actually test the filter.
+    Put p2 = new Put(ROW2);
+    p2.addColumn(FAMILYA, QUAL, now, QUAL);
+
+    exportTable.put(Arrays.asList(p1, p2));
+
+    // Export the simple table
+    String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" 
};
+    assertTrue(runExport(args));
+
+    // Import to a new table
+    final String IMPORT_TABLE = name.getMethodName() + "import";
+    desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
+    desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
+    UTIL.getAdmin().createTable(desc);
+
+    Table importTable = UTIL.getConnection().getTable(desc.getTableName());
+    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + 
PrefixFilter.class.getName(),
+        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), 
IMPORT_TABLE,
+        FQ_OUTPUT_DIR,
+        "1000" };
+    assertTrue(runImport(args));
+
+    // get the count of the source table for that time range
+    PrefixFilter filter = new PrefixFilter(ROW1);
+    int count = getCount(exportTable, filter);
+
+    Assert.assertEquals("Unexpected row count between export and import 
tables", count,
+      getCount(importTable, null));
+
+    // and then test that a broken command doesn't bork everything - easier 
here because we don't
+    // need to re-run the export job
+
+    args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + 
Filter.class.getName(),
+        "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", 
name.getMethodName(),
+        FQ_OUTPUT_DIR, "1000" };
+    assertFalse(runImport(args));
+
+    // cleanup
+    exportTable.close();
+    importTable.close();
+  }
+
+  /**
+   * Count the number of keyvalues in the specified table for the given 
timerange
+   * @param start
+   * @param end
+   * @param table
+   * @return
+   * @throws IOException
+   */
+  private int getCount(Table table, Filter filter) throws IOException {
+    Scan scan = new Scan();
+    scan.setFilter(filter);
+    ResultScanner results = table.getScanner(scan);
+    int count = 0;
+    for (Result res : results) {
+      count += res.size();
+    }
+    results.close();
+    return count;
+  }
+
+  /**
+   * test main method. Import should print help and call System.exit
+   */
+  @Test
+  public void testImportMain() throws Exception {
+    PrintStream oldPrintStream = System.err;
+    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
+    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
+    System.setSecurityManager(newSecurityManager);
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    String[] args = {};
+    System.setErr(new PrintStream(data));
+    try {
+      System.setErr(new PrintStream(data));
+      Import.main(args);
+      fail("should be SecurityException");
+    } catch (SecurityException e) {
+      assertEquals(-1, newSecurityManager.getExitCode());
+      assertTrue(data.toString().contains("Wrong number of arguments:"));
+      
assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
+      assertTrue(data.toString().contains("-Dimport.filter.class=<name of 
filter class>"));
+      
assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
+      
assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
+    } finally {
+      System.setErr(oldPrintStream);
+      System.setSecurityManager(SECURITY_MANAGER);
+    }
+  }
+
+  /**
+   * test main method. Export should print help and call System.exit
+   */
+  @Test
+  public void testExportMain() throws Exception {
+    PrintStream oldPrintStream = System.err;
+    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
+    LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
+    System.setSecurityManager(newSecurityManager);
+    ByteArrayOutputStream data = new ByteArrayOutputStream();
+    String[] args = {};
+    System.setErr(new PrintStream(data));
+    try {
+      System.setErr(new PrintStream(data));
+      Export.main(args);
+      fail("should be SecurityException");
+    } catch (SecurityException e) {
+      assertEquals(-1, newSecurityManager.getExitCode());
+      String errMsg = data.toString();
+      assertTrue(errMsg.contains("Wrong number of arguments:"));
+      assertTrue(errMsg.contains(
+              "Usage: Export [-D <property=value>]* <tablename> <outputdir> 
[<versions> " +
+              "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to 
filter]]"));
+      assertTrue(
+        errMsg.contains("-D 
hbase.mapreduce.scan.column.family=<family1>,<family2>, ..."));
+      assertTrue(errMsg.contains("-D 
hbase.mapreduce.include.deleted.rows=true"));
+      assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100"));
+      assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false"));
+      assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false"));
+      assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10"));
+    } finally {
+      System.setErr(oldPrintStream);
+      System.setSecurityManager(SECURITY_MANAGER);
+    }
+  }
+
+  /**
+   * Test map method of Importer
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testKeyValueImporter() throws Exception {
+    KeyValueImporter importer = new KeyValueImporter();
+    Configuration configuration = new Configuration();
+    Context ctx = mock(Context.class);
+    when(ctx.getConfiguration()).thenReturn(configuration);
+
+    doAnswer(new Answer<Void>() {
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        ImmutableBytesWritable writer = (ImmutableBytesWritable) 
invocation.getArguments()[0];
+        KeyValue key = (KeyValue) invocation.getArguments()[1];
+        assertEquals("Key", Bytes.toString(writer.get()));
+        assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
+        return null;
+      }
+    }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
+
+    importer.setup(ctx);
+    Result value = mock(Result.class);
+    KeyValue[] keys = {
+        new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), 
Bytes.toBytes("qualifier"),
+            Bytes.toBytes("value")),
+        new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), 
Bytes.toBytes("qualifier"),
+            Bytes.toBytes("value1")) };
+    when(value.rawCells()).thenReturn(keys);
+    importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
+
+  }
+
+  /**
+   * Test addFilterAndArguments method of Import This method set couple
+   * parameters into Configuration
+   */
+  @Test
+  public void testAddFilterAndArguments() throws IOException {
+    Configuration configuration = new Configuration();
+
+    List<String> args = new ArrayList<>();
+    args.add("param1");
+    args.add("param2");
+
+    Import.addFilterAndArguments(configuration, FilterBase.class, args);
+    assertEquals("org.apache.hadoop.hbase.filter.FilterBase",
+        configuration.get(Import.FILTER_CLASS_CONF_KEY));
+    assertEquals("param1,param2", 
configuration.get(Import.FILTER_ARGS_CONF_KEY));
+  }
+
+  @Test
+  public void testDurability() throws Exception {
+    // Create an export table.
+    String exportTableName = name.getMethodName() + "export";
+    try (Table exportTable = 
UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) {
+
+      // Insert some data
+      Put put = new Put(ROW1);
+      put.addColumn(FAMILYA, QUAL, now, QUAL);
+      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+      exportTable.put(put);
+
+      put = new Put(ROW2);
+      put.addColumn(FAMILYA, QUAL, now, QUAL);
+      put.addColumn(FAMILYA, QUAL, now + 1, QUAL);
+      put.addColumn(FAMILYA, QUAL, now + 2, QUAL);
+      exportTable.put(put);
+
+      // Run the export
+      String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
+      assertTrue(runExport(args));
+
+      // Create the table for import
+      String importTableName = name.getMethodName() + "import1";
+      Table importTable = UTIL.createTable(TableName.valueOf(importTableName), 
FAMILYA, 3);
+
+      // Register the wal listener for the import table
+      HRegionInfo region = 
UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
+          .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
+      TableWALActionListener walListener = new TableWALActionListener(region);
+      WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
+      wal.registerWALActionsListener(walListener);
+
+      // Run the import with SKIP_WAL
+      args =
+          new String[] { "-D" + Import.WAL_DURABILITY + "=" + 
Durability.SKIP_WAL.name(),
+              importTableName, FQ_OUTPUT_DIR };
+      assertTrue(runImport(args));
+      //Assert that the wal is not visisted
+      assertTrue(!walListener.isWALVisited());
+      //Ensure that the count is 2 (only one version of key value is obtained)
+      assertTrue(getCount(importTable, null) == 2);
+
+      // Run the import with the default durability option
+      importTableName = name.getMethodName() + "import2";
+      importTable = UTIL.createTable(TableName.valueOf(importTableName), 
FAMILYA, 3);
+      region = 
UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
+          .getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
+      wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
+      walListener = new TableWALActionListener(region);
+      wal.registerWALActionsListener(walListener);
+      args = new String[] { importTableName, FQ_OUTPUT_DIR };
+      assertTrue(runImport(args));
+      //Assert that the wal is visisted
+      assertTrue(walListener.isWALVisited());
+      //Ensure that the count is 2 (only one version of key value is obtained)
+      assertTrue(getCount(importTable, null) == 2);
+    }
+  }
+
+  /**
+   * This listens to the {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, 
WALEdit)} to
+   * identify that an entry is written to the Write Ahead Log for the given 
table.
+   */
+  private static class TableWALActionListener extends WALActionsListener.Base {
+
+    private HRegionInfo regionInfo;
+    private boolean isVisited = false;
+
+    public TableWALActionListener(HRegionInfo region) {
+      this.regionInfo = region;
+    }
+
+    @Override
+    public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
+      if (logKey.getTablename().getNameAsString().equalsIgnoreCase(
+          this.regionInfo.getTable().getNameAsString()) && 
(!logEdit.isMetaEdit())) {
+        isVisited = true;
+      }
+    }
+
+    public boolean isWALVisited() {
+      return isVisited;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
new file mode 100644
index 0000000..7d6d74f
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestImportTSVWithOperationAttributes implements Configurable {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+  private static final Log LOG = 
LogFactory.getLog(TestImportTSVWithOperationAttributes.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default 
is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + 
".deleteAfterLoad";
+
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private static Configuration conf;
+
+  private static final String TEST_ATR_KEY = "test";
+
+  private final String FAMILY = "FAM";
+
+  @Rule
+  public TestName name = new TestName();
+
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    conf = util.getConfiguration();
+    conf.set("hbase.coprocessor.master.classes", 
OperationAttributesTestController.class.getName());
+    conf.set("hbase.coprocessor.region.classes", 
OperationAttributesTestController.class.getName());
+    util.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMROnTable() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + 
"=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1, true);
+    util.deleteTable(tableName);
+  }
+
+  @Test
+  public void testMROnTableWithInvalidOperationAttr() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + 
"=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1, false);
+    util.deleteTable(tableName);
+  }
+
+  /**
+   * Run an ImportTsv job and perform basic validation on the results. Returns
+   * the ImportTsv <code>Tool</code> instance so that other tests can inspect 
it
+   * for further validation as necessary. This method is static to insure
+   * non-reliance on instance's util/conf facilities.
+   *
+   * @param args
+   *          Any arguments to pass BEFORE inputFile path is appended.
+   * @param dataAvailable
+   * @return The Tool instance used to run the test.
+   */
+  private Tool doMROnTableTest(HBaseTestingUtility util, String family, String 
data, String[] args,
+      int valueMultiplier, boolean dataAvailable) throws Exception {
+    String table = args[args.length - 1];
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new 
Path(util.getDataTestDirOnTestFS(table), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("mapreduce.map.combine.minspills", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<>(Arrays.asList(args));
+    argv.add(inputPath.toString());
+    Tool tool = new ImportTsv();
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+
+    validateTable(conf, TableName.valueOf(table), family, valueMultiplier, 
dataAvailable);
+
+    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+      LOG.debug("Deleting test subdirectory");
+      util.cleanupDataTestDirOnTestFS(table);
+    }
+    return tool;
+  }
+
+  /**
+   * Confirm ImportTsv via data in online table.
+   *
+   * @param dataAvailable
+   */
+  private static void validateTable(Configuration conf, TableName tableName, 
String family,
+      int valueMultiplier, boolean dataAvailable) throws IOException {
+
+    LOG.debug("Validating table.");
+    Connection connection = ConnectionFactory.createConnection(conf);
+    Table table = connection.getTable(tableName);
+    boolean verified = false;
+    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        Scan scan = new Scan();
+        // Scan entire family.
+        scan.addFamily(Bytes.toBytes(family));
+        if (dataAvailable) {
+          ResultScanner resScanner = table.getScanner(scan);
+          for (Result res : resScanner) {
+            LOG.debug("Getting results " + res.size());
+            assertTrue(res.size() == 2);
+            List<Cell> kvs = res.listCells();
+            assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
+            assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
+            assertTrue(CellUtil.matchingValue(kvs.get(0), 
Bytes.toBytes("VALUE" + valueMultiplier)));
+            assertTrue(CellUtil.matchingValue(kvs.get(1),
+                Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
+            // Only one result set is expected, so let it loop.
+            verified = true;
+          }
+        } else {
+          ResultScanner resScanner = table.getScanner(scan);
+          Result[] next = resScanner.next(2);
+          assertEquals(0, next.length);
+          verified = true;
+        }
+
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty. Presume its because updates came in
+        // after the scanner had been opened. Wait a while and retry.
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    table.close();
+    connection.close();
+    assertTrue(verified);
+  }
+
+  public static class OperationAttributesTestController implements 
RegionObserver {
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put 
put, WALEdit edit,
+        Durability durability) throws IOException {
+      Region region = e.getEnvironment().getRegion();
+      if (!region.getRegionInfo().isMetaTable()
+          && !region.getRegionInfo().getTable().isSystemTable()) {
+        if (put.getAttribute(TEST_ATR_KEY) != null) {
+          LOG.debug("allow any put to happen " + 
region.getRegionInfo().getRegionNameAsString());
+        } else {
+          e.bypass();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
new file mode 100644
index 0000000..4ab3d29
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestImportTSVWithTTLs implements Configurable {
+
+  protected static final Log LOG = 
LogFactory.getLog(TestImportTSVWithTTLs.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default 
is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + 
".deleteAfterLoad";
+
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private final String FAMILY = "FAM";
+  private static Configuration conf;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Override
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    conf = util.getConfiguration();
+    // We don't check persistence in HFiles in this test, but if we ever do we 
will
+    // need this where the default hfile version is not 3 (i.e. 0.98)
+    conf.setInt("hfile.format.version", 3);
+    conf.set("hbase.coprocessor.region.classes", 
TTLCheckingObserver.class.getName());
+    util.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMROnTable() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(tableName);
+  }
+
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String 
family, String data,
+      String[] args, int valueMultiplier) throws Exception {
+    TableName table = TableName.valueOf(args[args.length - 1]);
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new Path(util
+        .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("mapreduce.map.combine.minspills", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<>(Arrays.asList(args));
+    argv.add(inputPath.toString());
+    Tool tool = new ImportTsv();
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    try {
+      // Job will fail if observer rejects entries without TTL
+      assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+    } finally {
+      // Clean up
+      if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+        LOG.debug("Deleting test subdirectory");
+        util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+      }
+    }
+
+    return tool;
+  }
+
+  public static class TTLCheckingObserver implements RegionObserver {
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put 
put, WALEdit edit,
+        Durability durability) throws IOException {
+      Region region = e.getEnvironment().getRegion();
+      if (!region.getRegionInfo().isMetaTable()
+          && !region.getRegionInfo().getTable().isSystemTable()) {
+        // The put carries the TTL attribute
+        if (put.getTTL() != Long.MAX_VALUE) {
+          return;
+        }
+        throw new IOException("Operation does not have TTL set");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
new file mode 100644
index 0000000..8967ac7
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator;
+import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator;
+import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
+import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
+import org.apache.hadoop.hbase.security.visibility.VisibilityController;
+import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestImportTSVWithVisibilityLabels implements Configurable {
+
+  private static final Log LOG = 
LogFactory.getLog(TestImportTSVWithVisibilityLabels.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default 
is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + 
".deleteAfterLoad";
+
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private final String FAMILY = "FAM";
+  private final static String TOPSECRET = "topsecret";
+  private final static String PUBLIC = "public";
+  private final static String PRIVATE = "private";
+  private final static String CONFIDENTIAL = "confidential";
+  private final static String SECRET = "secret";
+  private static User SUPERUSER;
+  private static Configuration conf;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Override
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    conf = util.getConfiguration();
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
+    conf.set("hbase.superuser", "admin,"+User.getCurrent().getName());
+    conf.setInt("hfile.format.version", 3);
+    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
+    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, 
SimpleScanLabelGenerator.class,
+        ScanLabelGenerator.class);
+    util.startMiniCluster();
+    // Wait for the labels table to become available
+    util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 
50000);
+    createLabels();
+  }
+
+  private static void createLabels() throws IOException, InterruptedException {
+    PrivilegedExceptionAction<VisibilityLabelsResponse> action =
+        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
+      @Override
+      public VisibilityLabelsResponse run() throws Exception {
+        String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE };
+        try (Connection conn = ConnectionFactory.createConnection(conf)) {
+          VisibilityClient.addLabels(conn, labels);
+          LOG.info("Added labels ");
+        } catch (Throwable t) {
+          LOG.error("Error in adding labels" , t);
+          throw new IOException(t);
+        }
+        return null;
+      }
+    };
+    SUPERUSER.runAs(action);
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMROnTable() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(tableName);
+  }
+
+  @Test
+  public void testMROnTableWithDeletes() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY + 
"=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    issueDeleteAndVerifyData(tableName);
+    util.deleteTable(tableName);
+  }
+
+  private void issueDeleteAndVerifyData(TableName tableName) throws 
IOException {
+    LOG.debug("Validating table after delete.");
+    Table table = util.getConnection().getTable(tableName);
+    boolean verified = false;
+    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        Delete d = new Delete(Bytes.toBytes("KEY"));
+        d.addFamily(Bytes.toBytes(FAMILY));
+        d.setCellVisibility(new CellVisibility("private&secret"));
+        table.delete(d);
+
+        Scan scan = new Scan();
+        // Scan entire family.
+        scan.addFamily(Bytes.toBytes(FAMILY));
+        scan.setAuthorizations(new Authorizations("secret", "private"));
+        ResultScanner resScanner = table.getScanner(scan);
+        Result[] next = resScanner.next(5);
+        assertEquals(0, next.length);
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty. Presume its because updates came in
+        // after the scanner had been opened. Wait a while and retry.
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    table.close();
+    assertTrue(verified);
+  }
+
+  @Test
+  public void testMROnTableWithBulkload() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+    Path hfiles = new 
Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
+        "-D" + ImportTsv.COLUMNS_CONF_KEY
+            + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(tableName);
+  }
+
+  @Test
+  public void testBulkOutputWithTsvImporterTextMapper() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+    String FAMILY = "FAM";
+    Path bulkOutputPath = new 
Path(util.getDataTestDirOnTestFS(table.getNameAsString()),"hfiles");
+    // Prepare the arguments required for the test.
+    String[] args =
+        new String[] {
+            "-D" + ImportTsv.MAPPER_CONF_KEY
+                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
+            "-D" + ImportTsv.COLUMNS_CONF_KEY
+                + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
+            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + 
bulkOutputPath.toString(),
+            table.getNameAsString()
+            };
+    String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n";
+    doMROnTableTest(util, FAMILY, data, args, 4);
+    util.deleteTable(table);
+  }
+
+  @Test
+  public void testMRWithOutputFormat() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+    Path hfiles = new 
Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+        "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+    String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(tableName);
+  }
+
+  @Test
+  public void testBulkOutputWithInvalidLabels() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+    Path hfiles = new 
Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
+    // Prepare the arguments required for the test.
+    String[] args =
+        new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + 
hfiles.toString(),
+            "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
+            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+
+    // 2 Data rows, one with valid label and one with invalid label
+    String data =
+        
"KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1, 2);
+    util.deleteTable(tableName);
+  }
+
+  @Test
+  public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() 
throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName() + 
UUID.randomUUID());
+    Path hfiles = new 
Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles");
+    // Prepare the arguments required for the test.
+    String[] args =
+        new String[] {
+            "-D" + ImportTsv.MAPPER_CONF_KEY
+                + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper",
+            "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(),
+            "-D" + ImportTsv.COLUMNS_CONF_KEY + 
"=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY",
+            "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", 
tableName.getNameAsString() };
+
+    // 2 Data rows, one with valid label and one with invalid label
+    String data =
+        
"KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1, 2);
+    util.deleteTable(tableName);
+  }
+
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String 
family, String data,
+      String[] args, int valueMultiplier) throws Exception {
+    return doMROnTableTest(util, family, data, args, valueMultiplier, -1);
+  }
+
+  /**
+   * Run an ImportTsv job and perform basic validation on the results. Returns
+   * the ImportTsv <code>Tool</code> instance so that other tests can inspect 
it
+   * for further validation as necessary. This method is static to insure
+   * non-reliance on instance's util/conf facilities.
+   *
+   * @param args
+   *          Any arguments to pass BEFORE inputFile path is appended.
+   *
+   * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount 
check
+   *
+   * @return The Tool instance used to run the test.
+   */
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String 
family, String data,
+      String[] args, int valueMultiplier,int expectedKVCount) throws Exception 
{
+    TableName table = TableName.valueOf(args[args.length - 1]);
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new Path(util
+        .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    if (data == null) {
+      data = "KEY\u001bVALUE1\u001bVALUE2\n";
+    }
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("mapreduce.map.combine.minspills", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<>(Arrays.asList(args));
+    argv.add(inputPath.toString());
+    Tool tool = new ImportTsv();
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+
+    // Perform basic validation. If the input args did not include
+    // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table.
+    // Otherwise, validate presence of hfiles.
+    boolean createdHFiles = false;
+    String outputPath = null;
+    for (String arg : argv) {
+      if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) {
+        createdHFiles = true;
+        // split '-Dfoo=bar' on '=' and keep 'bar'
+        outputPath = arg.split("=")[1];
+        break;
+      }
+    }
+    LOG.debug("validating the table " + createdHFiles);
+    if (createdHFiles)
+     validateHFiles(fs, outputPath, family,expectedKVCount);
+    else
+      validateTable(conf, table, family, valueMultiplier);
+
+    if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+      LOG.debug("Deleting test subdirectory");
+      util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+    }
+    return tool;
+  }
+
+  /**
+   * Confirm ImportTsv via HFiles on fs.
+   */
+  private static void validateHFiles(FileSystem fs, String outputPath, String 
family,
+      int expectedKVCount) throws IOException {
+
+    // validate number and content of output columns
+    LOG.debug("Validating HFiles.");
+    Set<String> configFamilies = new HashSet<>();
+    configFamilies.add(family);
+    Set<String> foundFamilies = new HashSet<>();
+    int actualKVCount = 0;
+    for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new 
OutputFilesFilter())) {
+      LOG.debug("The output path has files");
+      String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR);
+      String cf = elements[elements.length - 1];
+      foundFamilies.add(cf);
+      assertTrue(String.format(
+          "HFile ouput contains a column family (%s) not present in input 
families (%s)", cf,
+          configFamilies), configFamilies.contains(cf));
+      for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) {
+        assertTrue(String.format("HFile %s appears to contain no data.", 
hfile.getPath()),
+            hfile.getLen() > 0);
+        if (expectedKVCount > -1) {
+          actualKVCount += getKVCountFromHfile(fs, hfile.getPath());
+        }
+      }
+    }
+    if (expectedKVCount > -1) {
+      assertTrue(String.format(
+        "KV count in output hfile=<%d> doesn't match with expected KV 
count=<%d>", actualKVCount,
+        expectedKVCount), actualKVCount == expectedKVCount);
+    }
+  }
+
+  /**
+   * Confirm ImportTsv via data in online table.
+   */
+  private static void validateTable(Configuration conf, TableName tableName, 
String family,
+      int valueMultiplier) throws IOException {
+
+    LOG.debug("Validating table.");
+    Table table = util.getConnection().getTable(tableName);
+    boolean verified = false;
+    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
+    int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+    for (int i = 0; i < numRetries; i++) {
+      try {
+        Scan scan = new Scan();
+        // Scan entire family.
+        scan.addFamily(Bytes.toBytes(family));
+        scan.setAuthorizations(new Authorizations("secret","private"));
+        ResultScanner resScanner = table.getScanner(scan);
+        Result[] next = resScanner.next(5);
+        assertEquals(1, next.length);
+        for (Result res : resScanner) {
+          LOG.debug("Getting results " + res.size());
+          assertTrue(res.size() == 2);
+          List<Cell> kvs = res.listCells();
+          assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY")));
+          assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY")));
+          assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" 
+ valueMultiplier)));
+          assertTrue(CellUtil.matchingValue(kvs.get(1),
+              Bytes.toBytes("VALUE" + 2 * valueMultiplier)));
+          // Only one result set is expected, so let it loop.
+        }
+        verified = true;
+        break;
+      } catch (NullPointerException e) {
+        // If here, a cell was empty. Presume its because updates came in
+        // after the scanner had been opened. Wait a while and retry.
+      }
+      try {
+        Thread.sleep(pause);
+      } catch (InterruptedException e) {
+        // continue
+      }
+    }
+    table.close();
+    assertTrue(verified);
+  }
+
+  /**
+   * Method returns the total KVs in given hfile
+   * @param fs File System
+   * @param p HFile path
+   * @return KV count in the given hfile
+   * @throws IOException
+   */
+  private static int getKVCountFromHfile(FileSystem fs, Path p) throws 
IOException {
+    Configuration conf = util.getConfiguration();
+    HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), 
true, conf);
+    reader.loadFileInfo();
+    HFileScanner scanner = reader.getScanner(false, false);
+    scanner.seekTo();
+    int count = 0;
+    do {
+      count++;
+    } while (scanner.next());
+    reader.close();
+    return count;
+  }
+
+}

Reply via email to