http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java new file mode 100644 index 0000000..91d2696 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -0,0 +1,726 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeepDeletedCells; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Tests the table import and table export MR job functionality + */ +@Category({VerySlowMapReduceTests.class, MediumTests.class}) +public class TestImportExport { + private static final Log LOG = LogFactory.getLog(TestImportExport.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); + private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); + private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); + private static final String FAMILYA_STRING = "a"; + private static final String FAMILYB_STRING = "b"; + private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); + private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); + private static final byte[] QUAL = Bytes.toBytes("q"); + private static final String OUTPUT_DIR = "outputdir"; + private static String FQ_OUTPUT_DIR; + private static final String EXPORT_BATCH_SIZE = "100"; + + private static long now = System.currentTimeMillis(); + + @BeforeClass + public static void beforeClass() throws Exception { + // Up the handlers; this test needs more than usual. + UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); + UTIL.startMiniCluster(); + FQ_OUTPUT_DIR = + new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Rule + public final TestName name = new TestName(); + + @Before + public void announce() { + LOG.info("Running " + name.getMethodName()); + } + + @Before + @After + public void cleanup() throws Exception { + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + fs.delete(new Path(OUTPUT_DIR), true); + } + + /** + * Runs an export job with the specified command line args + * @param args + * @return true if job completed successfully + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + boolean runExport(String[] args) throws Exception { + // need to make a copy of the configuration because to make sure different temp dirs are used. + int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); + return status == 0; + } + + /** + * Runs an import job with the specified command line args + * @param args + * @return true if job completed successfully + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + boolean runImport(String[] args) throws Exception { + // need to make a copy of the configuration because to make sure different temp dirs are used. + int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); + return status == 0; + } + + /** + * Test simple replication case with column mapping + * @throws Exception + */ + @Test + public void testSimpleCase() throws Exception { + try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) { + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + t.put(p); + p = new Put(ROW3); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + t.put(p); + } + + String[] args = new String[] { + // Only export row1 & row2. + "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", + "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", + name.getMethodName(), + FQ_OUTPUT_DIR, + "1000", // max number of key versions per key to export + }; + assertTrue(runExport(args)); + + final String IMPORT_TABLE = name.getMethodName() + "import"; + try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) { + args = new String[] { + "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, + IMPORT_TABLE, + FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + + Get g = new Get(ROW1); + g.setMaxVersions(); + Result r = t.get(g); + assertEquals(3, r.size()); + g = new Get(ROW2); + g.setMaxVersions(); + r = t.get(g); + assertEquals(3, r.size()); + g = new Get(ROW3); + r = t.get(g); + assertEquals(0, r.size()); + } + } + + /** + * Test export hbase:meta table + * + * @throws Exception + */ + @Test + public void testMetaExport() throws Exception { + String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString(); + String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" }; + assertTrue(runExport(args)); + } + + /** + * Test import data from 0.94 exported file + * @throws Exception + */ + @Test + public void testImport94Table() throws Exception { + final String name = "exportedTableIn94Format"; + URL url = TestImportExport.class.getResource(name); + File f = new File(url.toURI()); + if (!f.exists()) { + LOG.warn("FAILED TO FIND " + f + "; skipping out on test"); + return; + } + assertTrue(f.exists()); + LOG.info("FILE=" + f); + Path importPath = new Path(f.toURI()); + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); + String IMPORT_TABLE = name; + try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) { + String[] args = new String[] { + "-Dhbase.import.version=0.94" , + IMPORT_TABLE, FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + /* exportedTableIn94Format contains 5 rows + ROW COLUMN+CELL + r1 column=f1:c1, timestamp=1383766761171, value=val1 + r2 column=f1:c1, timestamp=1383766771642, value=val2 + r3 column=f1:c1, timestamp=1383766777615, value=val3 + r4 column=f1:c1, timestamp=1383766785146, value=val4 + r5 column=f1:c1, timestamp=1383766791506, value=val5 + */ + assertEquals(5, UTIL.countRows(t)); + } + } + + /** + * Test export scanner batching + */ + @Test + public void testExportScannerBatching() throws Exception { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); + desc.addFamily(new HColumnDescriptor(FAMILYA) + .setMaxVersions(1) + ); + UTIL.getAdmin().createTable(desc); + try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { + + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + p.addColumn(FAMILYA, QUAL, now + 3, QUAL); + p.addColumn(FAMILYA, QUAL, now + 4, QUAL); + t.put(p); + + String[] args = new String[] { + "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. + name.getMethodName(), + FQ_OUTPUT_DIR + }; + assertTrue(runExport(args)); + + FileSystem fs = FileSystem.get(UTIL.getConfiguration()); + fs.delete(new Path(FQ_OUTPUT_DIR), true); + } + } + + @Test + public void testWithDeletes() throws Exception { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); + desc.addFamily(new HColumnDescriptor(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + ); + UTIL.getAdmin().createTable(desc); + try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { + + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + p.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p.addColumn(FAMILYA, QUAL, now + 2, QUAL); + p.addColumn(FAMILYA, QUAL, now + 3, QUAL); + p.addColumn(FAMILYA, QUAL, now + 4, QUAL); + t.put(p); + + Delete d = new Delete(ROW1, now+3); + t.delete(d); + d = new Delete(ROW1); + d.addColumns(FAMILYA, QUAL, now+2); + t.delete(d); + } + + String[] args = new String[] { + "-D" + Export.RAW_SCAN + "=true", + name.getMethodName(), + FQ_OUTPUT_DIR, + "1000", // max number of key versions per key to export + }; + assertTrue(runExport(args)); + + final String IMPORT_TABLE = name.getMethodName() + "import"; + desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); + desc.addFamily(new HColumnDescriptor(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + ); + UTIL.getAdmin().createTable(desc); + try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { + args = new String[] { + IMPORT_TABLE, + FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + + Scan s = new Scan(); + s.setMaxVersions(); + s.setRaw(true); + ResultScanner scanner = t.getScanner(s); + Result r = scanner.next(); + Cell[] res = r.rawCells(); + assertTrue(CellUtil.isDeleteFamily(res[0])); + assertEquals(now+4, res[1].getTimestamp()); + assertEquals(now+3, res[2].getTimestamp()); + assertTrue(CellUtil.isDelete(res[3])); + assertEquals(now+2, res[4].getTimestamp()); + assertEquals(now+1, res[5].getTimestamp()); + assertEquals(now, res[6].getTimestamp()); + } + } + + + @Test + public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception { + final TableName exportTable = TableName.valueOf(name.getMethodName()); + HTableDescriptor desc = new HTableDescriptor(exportTable); + desc.addFamily(new HColumnDescriptor(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + ); + UTIL.getAdmin().createTable(desc); + + Table exportT = UTIL.getConnection().getTable(exportTable); + + //Add first version of QUAL + Put p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now, QUAL); + exportT.put(p); + + //Add Delete family marker + Delete d = new Delete(ROW1, now+3); + exportT.delete(d); + + //Add second version of QUAL + p = new Put(ROW1); + p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes()); + exportT.put(p); + + //Add second Delete family marker + d = new Delete(ROW1, now+7); + exportT.delete(d); + + + String[] args = new String[] { + "-D" + Export.RAW_SCAN + "=true", exportTable.getNameAsString(), + FQ_OUTPUT_DIR, + "1000", // max number of key versions per key to export + }; + assertTrue(runExport(args)); + + final String importTable = name.getMethodName() + "import"; + desc = new HTableDescriptor(TableName.valueOf(importTable)); + desc.addFamily(new HColumnDescriptor(FAMILYA) + .setMaxVersions(5) + .setKeepDeletedCells(KeepDeletedCells.TRUE) + ); + UTIL.getAdmin().createTable(desc); + + Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); + args = new String[] { + importTable, + FQ_OUTPUT_DIR + }; + assertTrue(runImport(args)); + + Scan s = new Scan(); + s.setMaxVersions(); + s.setRaw(true); + + ResultScanner importedTScanner = importT.getScanner(s); + Result importedTResult = importedTScanner.next(); + + ResultScanner exportedTScanner = exportT.getScanner(s); + Result exportedTResult = exportedTScanner.next(); + try { + Result.compareResults(exportedTResult, importedTResult); + } catch (Exception e) { + fail("Original and imported tables data comparision failed with error:"+e.getMessage()); + } finally { + exportT.close(); + importT.close(); + } + } + + /** + * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, + * attempt with invalid values. + */ + @Test + public void testWithFilter() throws Exception { + // Create simple table to export + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); + desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); + UTIL.getAdmin().createTable(desc); + Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); + + Put p1 = new Put(ROW1); + p1.addColumn(FAMILYA, QUAL, now, QUAL); + p1.addColumn(FAMILYA, QUAL, now + 1, QUAL); + p1.addColumn(FAMILYA, QUAL, now + 2, QUAL); + p1.addColumn(FAMILYA, QUAL, now + 3, QUAL); + p1.addColumn(FAMILYA, QUAL, now + 4, QUAL); + + // Having another row would actually test the filter. + Put p2 = new Put(ROW2); + p2.addColumn(FAMILYA, QUAL, now, QUAL); + + exportTable.put(Arrays.asList(p1, p2)); + + // Export the simple table + String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; + assertTrue(runExport(args)); + + // Import to a new table + final String IMPORT_TABLE = name.getMethodName() + "import"; + desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); + desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); + UTIL.getAdmin().createTable(desc); + + Table importTable = UTIL.getConnection().getTable(desc.getTableName()); + args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), + "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, + FQ_OUTPUT_DIR, + "1000" }; + assertTrue(runImport(args)); + + // get the count of the source table for that time range + PrefixFilter filter = new PrefixFilter(ROW1); + int count = getCount(exportTable, filter); + + Assert.assertEquals("Unexpected row count between export and import tables", count, + getCount(importTable, null)); + + // and then test that a broken command doesn't bork everything - easier here because we don't + // need to re-run the export job + + args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), + "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(), + FQ_OUTPUT_DIR, "1000" }; + assertFalse(runImport(args)); + + // cleanup + exportTable.close(); + importTable.close(); + } + + /** + * Count the number of keyvalues in the specified table for the given timerange + * @param start + * @param end + * @param table + * @return + * @throws IOException + */ + private int getCount(Table table, Filter filter) throws IOException { + Scan scan = new Scan(); + scan.setFilter(filter); + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + count += res.size(); + } + results.close(); + return count; + } + + /** + * test main method. Import should print help and call System.exit + */ + @Test + public void testImportMain() throws Exception { + PrintStream oldPrintStream = System.err; + SecurityManager SECURITY_MANAGER = System.getSecurityManager(); + LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); + System.setSecurityManager(newSecurityManager); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + Import.main(args); + fail("should be SecurityException"); + } catch (SecurityException e) { + assertEquals(-1, newSecurityManager.getExitCode()); + assertTrue(data.toString().contains("Wrong number of arguments:")); + assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); + assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>")); + assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); + assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false")); + } finally { + System.setErr(oldPrintStream); + System.setSecurityManager(SECURITY_MANAGER); + } + } + + /** + * test main method. Export should print help and call System.exit + */ + @Test + public void testExportMain() throws Exception { + PrintStream oldPrintStream = System.err; + SecurityManager SECURITY_MANAGER = System.getSecurityManager(); + LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); + System.setSecurityManager(newSecurityManager); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + Export.main(args); + fail("should be SecurityException"); + } catch (SecurityException e) { + assertEquals(-1, newSecurityManager.getExitCode()); + String errMsg = data.toString(); + assertTrue(errMsg.contains("Wrong number of arguments:")); + assertTrue(errMsg.contains( + "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " + + "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]")); + assertTrue( + errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...")); + assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); + assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100")); + assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false")); + assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false")); + assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10")); + } finally { + System.setErr(oldPrintStream); + System.setSecurityManager(SECURITY_MANAGER); + } + } + + /** + * Test map method of Importer + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testKeyValueImporter() throws Exception { + KeyValueImporter importer = new KeyValueImporter(); + Configuration configuration = new Configuration(); + Context ctx = mock(Context.class); + when(ctx.getConfiguration()).thenReturn(configuration); + + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; + KeyValue key = (KeyValue) invocation.getArguments()[1]; + assertEquals("Key", Bytes.toString(writer.get())); + assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); + return null; + } + }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); + + importer.setup(ctx); + Result value = mock(Result.class); + KeyValue[] keys = { + new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), + Bytes.toBytes("value")), + new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), + Bytes.toBytes("value1")) }; + when(value.rawCells()).thenReturn(keys); + importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); + + } + + /** + * Test addFilterAndArguments method of Import This method set couple + * parameters into Configuration + */ + @Test + public void testAddFilterAndArguments() throws IOException { + Configuration configuration = new Configuration(); + + List<String> args = new ArrayList<>(); + args.add("param1"); + args.add("param2"); + + Import.addFilterAndArguments(configuration, FilterBase.class, args); + assertEquals("org.apache.hadoop.hbase.filter.FilterBase", + configuration.get(Import.FILTER_CLASS_CONF_KEY)); + assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); + } + + @Test + public void testDurability() throws Exception { + // Create an export table. + String exportTableName = name.getMethodName() + "export"; + try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) { + + // Insert some data + Put put = new Put(ROW1); + put.addColumn(FAMILYA, QUAL, now, QUAL); + put.addColumn(FAMILYA, QUAL, now + 1, QUAL); + put.addColumn(FAMILYA, QUAL, now + 2, QUAL); + exportTable.put(put); + + put = new Put(ROW2); + put.addColumn(FAMILYA, QUAL, now, QUAL); + put.addColumn(FAMILYA, QUAL, now + 1, QUAL); + put.addColumn(FAMILYA, QUAL, now + 2, QUAL); + exportTable.put(put); + + // Run the export + String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"}; + assertTrue(runExport(args)); + + // Create the table for import + String importTableName = name.getMethodName() + "import1"; + Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); + + // Register the wal listener for the import table + HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() + .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + TableWALActionListener walListener = new TableWALActionListener(region); + WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); + wal.registerWALActionsListener(walListener); + + // Run the import with SKIP_WAL + args = + new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), + importTableName, FQ_OUTPUT_DIR }; + assertTrue(runImport(args)); + //Assert that the wal is not visisted + assertTrue(!walListener.isWALVisited()); + //Ensure that the count is 2 (only one version of key value is obtained) + assertTrue(getCount(importTable, null) == 2); + + // Run the import with the default durability option + importTableName = name.getMethodName() + "import2"; + importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); + region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() + .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); + wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); + walListener = new TableWALActionListener(region); + wal.registerWALActionsListener(walListener); + args = new String[] { importTableName, FQ_OUTPUT_DIR }; + assertTrue(runImport(args)); + //Assert that the wal is visisted + assertTrue(walListener.isWALVisited()); + //Ensure that the count is 2 (only one version of key value is obtained) + assertTrue(getCount(importTable, null) == 2); + } + } + + /** + * This listens to the {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} to + * identify that an entry is written to the Write Ahead Log for the given table. + */ + private static class TableWALActionListener extends WALActionsListener.Base { + + private HRegionInfo regionInfo; + private boolean isVisited = false; + + public TableWALActionListener(HRegionInfo region) { + this.regionInfo = region; + } + + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { + if (logKey.getTablename().getNameAsString().equalsIgnoreCase( + this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) { + isVisited = true; + } + } + + public boolean isWALVisited() { + return isVisited; + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java new file mode 100644 index 0000000..7d6d74f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +@Category({MapReduceTests.class, LargeTests.class}) +public class TestImportTSVWithOperationAttributes implements Configurable { + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + private static final Log LOG = LogFactory.getLog(TestImportTSVWithOperationAttributes.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private static Configuration conf; + + private static final String TEST_ATR_KEY = "test"; + + private final String FAMILY = "FAM"; + + @Rule + public TestName name = new TestName(); + + public Configuration getConf() { + return util.getConfiguration(); + } + + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + conf = util.getConfiguration(); + conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName()); + conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName()); + util.startMiniCluster(); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testMROnTable() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1, true); + util.deleteTable(tableName); + } + + @Test + public void testMROnTableWithInvalidOperationAttr() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1, false); + util.deleteTable(tableName); + } + + /** + * Run an ImportTsv job and perform basic validation on the results. Returns + * the ImportTsv <code>Tool</code> instance so that other tests can inspect it + * for further validation as necessary. This method is static to insure + * non-reliance on instance's util/conf facilities. + * + * @param args + * Any arguments to pass BEFORE inputFile path is appended. + * @param dataAvailable + * @return The Tool instance used to run the test. + */ + private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, + int valueMultiplier, boolean dataAvailable) throws Exception { + String table = args[args.length - 1]; + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("mapreduce.map.combine.minspills", 1); + } + + // run the import + List<String> argv = new ArrayList<>(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + + validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable); + + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table); + } + return tool; + } + + /** + * Confirm ImportTsv via data in online table. + * + * @param dataAvailable + */ + private static void validateTable(Configuration conf, TableName tableName, String family, + int valueMultiplier, boolean dataAvailable) throws IOException { + + LOG.debug("Validating table."); + Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(Bytes.toBytes(family)); + if (dataAvailable) { + ResultScanner resScanner = table.getScanner(scan); + for (Result res : resScanner) { + LOG.debug("Getting results " + res.size()); + assertTrue(res.size() == 2); + List<Cell> kvs = res.listCells(); + assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); + assertTrue(CellUtil.matchingValue(kvs.get(1), + Bytes.toBytes("VALUE" + 2 * valueMultiplier))); + // Only one result set is expected, so let it loop. + verified = true; + } + } else { + ResultScanner resScanner = table.getScanner(scan); + Result[] next = resScanner.next(2); + assertEquals(0, next.length); + verified = true; + } + + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + table.close(); + connection.close(); + assertTrue(verified); + } + + public static class OperationAttributesTestController implements RegionObserver { + + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, + Durability durability) throws IOException { + Region region = e.getEnvironment().getRegion(); + if (!region.getRegionInfo().isMetaTable() + && !region.getRegionInfo().getTable().isSystemTable()) { + if (put.getAttribute(TEST_ATR_KEY) != null) { + LOG.debug("allow any put to happen " + region.getRegionInfo().getRegionNameAsString()); + } else { + e.bypass(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java new file mode 100644 index 0000000..4ab3d29 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({MapReduceTests.class, LargeTests.class}) +public class TestImportTSVWithTTLs implements Configurable { + + protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private final String FAMILY = "FAM"; + private static Configuration conf; + + @Rule + public TestName name = new TestName(); + + @Override + public Configuration getConf() { + return util.getConfiguration(); + } + + @Override + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + conf = util.getConfiguration(); + // We don't check persistence in HFiles in this test, but if we ever do we will + // need this where the default hfile version is not 3 (i.e. 0.98) + conf.setInt("hfile.format.version", 3); + conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName()); + util.startMiniCluster(); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testMROnTable() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier) throws Exception { + TableName table = TableName.valueOf(args[args.length - 1]); + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util + .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("mapreduce.map.combine.minspills", 1); + } + + // run the import + List<String> argv = new ArrayList<>(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + try { + // Job will fail if observer rejects entries without TTL + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + } finally { + // Clean up + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table.getNameAsString()); + } + } + + return tool; + } + + public static class TTLCheckingObserver implements RegionObserver { + + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, + Durability durability) throws IOException { + Region region = e.getEnvironment().getRegion(); + if (!region.getRegionInfo().isMetaTable() + && !region.getRegionInfo().getTable().isSystemTable()) { + // The put carries the TTL attribute + if (put.getTTL() != Long.MAX_VALUE) { + return; + } + throw new IOException("Operation does not have TTL set"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java new file mode 100644 index 0000000..8967ac7 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java @@ -0,0 +1,495 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; +import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; +import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; +import org.apache.hadoop.hbase.security.visibility.VisibilityClient; +import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; +import org.apache.hadoop.hbase.security.visibility.VisibilityController; +import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({MapReduceTests.class, LargeTests.class}) +public class TestImportTSVWithVisibilityLabels implements Configurable { + + private static final Log LOG = LogFactory.getLog(TestImportTSVWithVisibilityLabels.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + /** + * Delete the tmp directory after running doMROnTableTest. Boolean. Default is + * false. + */ + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private final String FAMILY = "FAM"; + private final static String TOPSECRET = "topsecret"; + private final static String PUBLIC = "public"; + private final static String PRIVATE = "private"; + private final static String CONFIDENTIAL = "confidential"; + private final static String SECRET = "secret"; + private static User SUPERUSER; + private static Configuration conf; + + @Rule + public TestName name = new TestName(); + + @Override + public Configuration getConf() { + return util.getConfiguration(); + } + + @Override + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + conf = util.getConfiguration(); + SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); + conf.set("hbase.superuser", "admin,"+User.getCurrent().getName()); + conf.setInt("hfile.format.version", 3); + conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName()); + conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName()); + conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, + ScanLabelGenerator.class); + util.startMiniCluster(); + // Wait for the labels table to become available + util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); + createLabels(); + } + + private static void createLabels() throws IOException, InterruptedException { + PrivilegedExceptionAction<VisibilityLabelsResponse> action = + new PrivilegedExceptionAction<VisibilityLabelsResponse>() { + @Override + public VisibilityLabelsResponse run() throws Exception { + String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; + try (Connection conn = ConnectionFactory.createConnection(conf)) { + VisibilityClient.addLabels(conn, labels); + LOG.info("Added labels "); + } catch (Throwable t) { + LOG.error("Error in adding labels" , t); + throw new IOException(t); + } + return null; + } + }; + SUPERUSER.runAs(action); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniCluster(); + } + + @Test + public void testMROnTable() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + @Test + public void testMROnTableWithDeletes() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + issueDeleteAndVerifyData(tableName); + util.deleteTable(tableName); + } + + private void issueDeleteAndVerifyData(TableName tableName) throws IOException { + LOG.debug("Validating table after delete."); + Table table = util.getConnection().getTable(tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + Delete d = new Delete(Bytes.toBytes("KEY")); + d.addFamily(Bytes.toBytes(FAMILY)); + d.setCellVisibility(new CellVisibility("private&secret")); + table.delete(d); + + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(Bytes.toBytes(FAMILY)); + scan.setAuthorizations(new Authorizations("secret", "private")); + ResultScanner resScanner = table.getScanner(scan); + Result[] next = resScanner.next(5); + assertEquals(0, next.length); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + table.close(); + assertTrue(verified); + } + + @Test + public void testMROnTableWithBulkload() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + @Test + public void testBulkOutputWithTsvImporterTextMapper() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + String FAMILY = "FAM"; + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()),"hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), + table.getNameAsString() + }; + String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; + doMROnTableTest(util, FAMILY, data, args, 4); + util.deleteTable(table); + } + + @Test + public void testMRWithOutputFormat() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1); + util.deleteTable(tableName); + } + + @Test + public void testBulkOutputWithInvalidLabels() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + + // 2 Data rows, one with valid label and one with invalid label + String data = + "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1, 2); + util.deleteTable(tableName); + } + + @Test + public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); + Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; + + // 2 Data rows, one with valid label and one with invalid label + String data = + "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; + util.createTable(tableName, FAMILY); + doMROnTableTest(util, FAMILY, data, args, 1, 2); + util.deleteTable(tableName); + } + + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier) throws Exception { + return doMROnTableTest(util, family, data, args, valueMultiplier, -1); + } + + /** + * Run an ImportTsv job and perform basic validation on the results. Returns + * the ImportTsv <code>Tool</code> instance so that other tests can inspect it + * for further validation as necessary. This method is static to insure + * non-reliance on instance's util/conf facilities. + * + * @param args + * Any arguments to pass BEFORE inputFile path is appended. + * + * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check + * + * @return The Tool instance used to run the test. + */ + protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, + String[] args, int valueMultiplier,int expectedKVCount) throws Exception { + TableName table = TableName.valueOf(args[args.length - 1]); + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified(new Path(util + .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + if (data == null) { + data = "KEY\u001bVALUE1\u001bVALUE2\n"; + } + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("mapreduce.map.combine.minspills", 1); + } + + // run the import + List<String> argv = new ArrayList<>(Arrays.asList(args)); + argv.add(inputPath.toString()); + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argv); + assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); + + // Perform basic validation. If the input args did not include + // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. + // Otherwise, validate presence of hfiles. + boolean createdHFiles = false; + String outputPath = null; + for (String arg : argv) { + if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { + createdHFiles = true; + // split '-Dfoo=bar' on '=' and keep 'bar' + outputPath = arg.split("=")[1]; + break; + } + } + LOG.debug("validating the table " + createdHFiles); + if (createdHFiles) + validateHFiles(fs, outputPath, family,expectedKVCount); + else + validateTable(conf, table, family, valueMultiplier); + + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table.getNameAsString()); + } + return tool; + } + + /** + * Confirm ImportTsv via HFiles on fs. + */ + private static void validateHFiles(FileSystem fs, String outputPath, String family, + int expectedKVCount) throws IOException { + + // validate number and content of output columns + LOG.debug("Validating HFiles."); + Set<String> configFamilies = new HashSet<>(); + configFamilies.add(family); + Set<String> foundFamilies = new HashSet<>(); + int actualKVCount = 0; + for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { + LOG.debug("The output path has files"); + String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); + String cf = elements[elements.length - 1]; + foundFamilies.add(cf); + assertTrue(String.format( + "HFile ouput contains a column family (%s) not present in input families (%s)", cf, + configFamilies), configFamilies.contains(cf)); + for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { + assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), + hfile.getLen() > 0); + if (expectedKVCount > -1) { + actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); + } + } + } + if (expectedKVCount > -1) { + assertTrue(String.format( + "KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, + expectedKVCount), actualKVCount == expectedKVCount); + } + } + + /** + * Confirm ImportTsv via data in online table. + */ + private static void validateTable(Configuration conf, TableName tableName, String family, + int valueMultiplier) throws IOException { + + LOG.debug("Validating table."); + Table table = util.getConnection().getTable(tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(Bytes.toBytes(family)); + scan.setAuthorizations(new Authorizations("secret","private")); + ResultScanner resScanner = table.getScanner(scan); + Result[] next = resScanner.next(5); + assertEquals(1, next.length); + for (Result res : resScanner) { + LOG.debug("Getting results " + res.size()); + assertTrue(res.size() == 2); + List<Cell> kvs = res.listCells(); + assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); + assertTrue(CellUtil.matchingValue(kvs.get(1), + Bytes.toBytes("VALUE" + 2 * valueMultiplier))); + // Only one result set is expected, so let it loop. + } + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + table.close(); + assertTrue(verified); + } + + /** + * Method returns the total KVs in given hfile + * @param fs File System + * @param p HFile path + * @return KV count in the given hfile + * @throws IOException + */ + private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { + Configuration conf = util.getConfiguration(); + HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); + reader.loadFileInfo(); + HFileScanner scanner = reader.getScanner(false, false); + scanner.seekTo(); + int count = 0; + do { + count++; + } while (scanner.next()); + reader.close(); + return count; + } + +}