http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java deleted file mode 100644 index dc59817..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ /dev/null @@ -1,727 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeepDeletedCells; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.LauncherSecurityManager; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.util.ToolRunner; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Tests the table import and table export MR job functionality - */ -@Category({VerySlowMapReduceTests.class, MediumTests.class}) -public class TestImportExport { - private static final Log LOG = LogFactory.getLog(TestImportExport.class); - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static final byte[] ROW1 = Bytes.toBytesBinary("\\x32row1"); - private static final byte[] ROW2 = Bytes.toBytesBinary("\\x32row2"); - private static final byte[] ROW3 = Bytes.toBytesBinary("\\x32row3"); - private static final String FAMILYA_STRING = "a"; - private static final String FAMILYB_STRING = "b"; - private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING); - private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING); - private static final byte[] QUAL = Bytes.toBytes("q"); - private static final String OUTPUT_DIR = "outputdir"; - private static String FQ_OUTPUT_DIR; - private static final String EXPORT_BATCH_SIZE = "100"; - - private static long now = System.currentTimeMillis(); - - @BeforeClass - public static void beforeClass() throws Exception { - // Up the handlers; this test needs more than usual. - UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); - UTIL.startMiniCluster(); - FQ_OUTPUT_DIR = - new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Rule - public final TestName name = new TestName(); - - @Before - public void announce() { - LOG.info("Running " + name.getMethodName()); - } - - @Before - @After - public void cleanup() throws Exception { - FileSystem fs = FileSystem.get(UTIL.getConfiguration()); - fs.delete(new Path(OUTPUT_DIR), true); - } - - /** - * Runs an export job with the specified command line args - * @param args - * @return true if job completed successfully - * @throws IOException - * @throws InterruptedException - * @throws ClassNotFoundException - */ - boolean runExport(String[] args) throws Exception { - // need to make a copy of the configuration because to make sure different temp dirs are used. - int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Export(), args); - return status == 0; - } - - /** - * Runs an import job with the specified command line args - * @param args - * @return true if job completed successfully - * @throws IOException - * @throws InterruptedException - * @throws ClassNotFoundException - */ - boolean runImport(String[] args) throws Exception { - // need to make a copy of the configuration because to make sure different temp dirs are used. - int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new Import(), args); - return status == 0; - } - - /** - * Test simple replication case with column mapping - * @throws Exception - */ - @Test - public void testSimpleCase() throws Exception { - try (Table t = UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILYA, 3);) { - Put p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - t.put(p); - p = new Put(ROW2); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - t.put(p); - p = new Put(ROW3); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - t.put(p); - } - - String[] args = new String[] { - // Only export row1 & row2. - "-D" + TableInputFormat.SCAN_ROW_START + "=\\x32row1", - "-D" + TableInputFormat.SCAN_ROW_STOP + "=\\x32row3", - name.getMethodName(), - FQ_OUTPUT_DIR, - "1000", // max number of key versions per key to export - }; - assertTrue(runExport(args)); - - final String IMPORT_TABLE = name.getMethodName() + "import"; - try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);) { - args = new String[] { - "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING, - IMPORT_TABLE, - FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); - - Get g = new Get(ROW1); - g.setMaxVersions(); - Result r = t.get(g); - assertEquals(3, r.size()); - g = new Get(ROW2); - g.setMaxVersions(); - r = t.get(g); - assertEquals(3, r.size()); - g = new Get(ROW3); - r = t.get(g); - assertEquals(0, r.size()); - } - } - - /** - * Test export hbase:meta table - * - * @throws Exception - */ - @Test - public void testMetaExport() throws Exception { - String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString(); - String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" }; - assertTrue(runExport(args)); - } - - /** - * Test import data from 0.94 exported file - * @throws Exception - */ - @Test - public void testImport94Table() throws Exception { - final String name = "exportedTableIn94Format"; - URL url = TestImportExport.class.getResource(name); - File f = new File(url.toURI()); - if (!f.exists()) { - LOG.warn("FAILED TO FIND " + f + "; skipping out on test"); - return; - } - assertTrue(f.exists()); - LOG.info("FILE=" + f); - Path importPath = new Path(f.toURI()); - FileSystem fs = FileSystem.get(UTIL.getConfiguration()); - fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR + name)); - String IMPORT_TABLE = name; - try (Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);) { - String[] args = new String[] { - "-Dhbase.import.version=0.94" , - IMPORT_TABLE, FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); - /* exportedTableIn94Format contains 5 rows - ROW COLUMN+CELL - r1 column=f1:c1, timestamp=1383766761171, value=val1 - r2 column=f1:c1, timestamp=1383766771642, value=val2 - r3 column=f1:c1, timestamp=1383766777615, value=val3 - r4 column=f1:c1, timestamp=1383766785146, value=val4 - r5 column=f1:c1, timestamp=1383766791506, value=val5 - */ - assertEquals(5, UTIL.countRows(t)); - } - } - - /** - * Test export scanner batching - */ - @Test - public void testExportScannerBatching() throws Exception { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(1) - ); - UTIL.getAdmin().createTable(desc); - try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { - - Put p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - p.addColumn(FAMILYA, QUAL, now + 3, QUAL); - p.addColumn(FAMILYA, QUAL, now + 4, QUAL); - t.put(p); - - String[] args = new String[] { - "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE, // added scanner batching arg. - name.getMethodName(), - FQ_OUTPUT_DIR - }; - assertTrue(runExport(args)); - - FileSystem fs = FileSystem.get(UTIL.getConfiguration()); - fs.delete(new Path(FQ_OUTPUT_DIR), true); - } - } - - @Test - public void testWithDeletes() throws Exception { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); - UTIL.getAdmin().createTable(desc); - try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { - - Put p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now, QUAL); - p.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p.addColumn(FAMILYA, QUAL, now + 2, QUAL); - p.addColumn(FAMILYA, QUAL, now + 3, QUAL); - p.addColumn(FAMILYA, QUAL, now + 4, QUAL); - t.put(p); - - Delete d = new Delete(ROW1, now+3); - t.delete(d); - d = new Delete(ROW1); - d.addColumns(FAMILYA, QUAL, now+2); - t.delete(d); - } - - String[] args = new String[] { - "-D" + Export.RAW_SCAN + "=true", - name.getMethodName(), - FQ_OUTPUT_DIR, - "1000", // max number of key versions per key to export - }; - assertTrue(runExport(args)); - - final String IMPORT_TABLE = name.getMethodName() + "import"; - desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); - UTIL.getAdmin().createTable(desc); - try (Table t = UTIL.getConnection().getTable(desc.getTableName());) { - args = new String[] { - IMPORT_TABLE, - FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); - - Scan s = new Scan(); - s.setMaxVersions(); - s.setRaw(true); - ResultScanner scanner = t.getScanner(s); - Result r = scanner.next(); - Cell[] res = r.rawCells(); - assertTrue(CellUtil.isDeleteFamily(res[0])); - assertEquals(now+4, res[1].getTimestamp()); - assertEquals(now+3, res[2].getTimestamp()); - assertTrue(CellUtil.isDelete(res[3])); - assertEquals(now+2, res[4].getTimestamp()); - assertEquals(now+1, res[5].getTimestamp()); - assertEquals(now, res[6].getTimestamp()); - } - } - - - @Test - public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception { - final TableName exportTable = TableName.valueOf(name.getMethodName()); - HTableDescriptor desc = new HTableDescriptor(exportTable); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); - UTIL.getAdmin().createTable(desc); - - Table exportT = UTIL.getConnection().getTable(exportTable); - - //Add first version of QUAL - Put p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now, QUAL); - exportT.put(p); - - //Add Delete family marker - Delete d = new Delete(ROW1, now+3); - exportT.delete(d); - - //Add second version of QUAL - p = new Put(ROW1); - p.addColumn(FAMILYA, QUAL, now + 5, "s".getBytes()); - exportT.put(p); - - //Add second Delete family marker - d = new Delete(ROW1, now+7); - exportT.delete(d); - - - String[] args = new String[] { - "-D" + Export.RAW_SCAN + "=true", exportTable.getNameAsString(), - FQ_OUTPUT_DIR, - "1000", // max number of key versions per key to export - }; - assertTrue(runExport(args)); - - final String importTable = name.getMethodName() + "import"; - desc = new HTableDescriptor(TableName.valueOf(importTable)); - desc.addFamily(new HColumnDescriptor(FAMILYA) - .setMaxVersions(5) - .setKeepDeletedCells(KeepDeletedCells.TRUE) - ); - UTIL.getAdmin().createTable(desc); - - Table importT = UTIL.getConnection().getTable(TableName.valueOf(importTable)); - args = new String[] { - importTable, - FQ_OUTPUT_DIR - }; - assertTrue(runImport(args)); - - Scan s = new Scan(); - s.setMaxVersions(); - s.setRaw(true); - - ResultScanner importedTScanner = importT.getScanner(s); - Result importedTResult = importedTScanner.next(); - - ResultScanner exportedTScanner = exportT.getScanner(s); - Result exportedTResult = exportedTScanner.next(); - try { - Result.compareResults(exportedTResult, importedTResult); - } catch (Exception e) { - fail("Original and imported tables data comparision failed with error:"+e.getMessage()); - } finally { - exportT.close(); - importT.close(); - } - } - - /** - * Create a simple table, run an Export Job on it, Import with filtering on, verify counts, - * attempt with invalid values. - */ - @Test - public void testWithFilter() throws Exception { - // Create simple table to export - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName())); - desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); - UTIL.getAdmin().createTable(desc); - Table exportTable = UTIL.getConnection().getTable(desc.getTableName()); - - Put p1 = new Put(ROW1); - p1.addColumn(FAMILYA, QUAL, now, QUAL); - p1.addColumn(FAMILYA, QUAL, now + 1, QUAL); - p1.addColumn(FAMILYA, QUAL, now + 2, QUAL); - p1.addColumn(FAMILYA, QUAL, now + 3, QUAL); - p1.addColumn(FAMILYA, QUAL, now + 4, QUAL); - - // Having another row would actually test the filter. - Put p2 = new Put(ROW2); - p2.addColumn(FAMILYA, QUAL, now, QUAL); - - exportTable.put(Arrays.asList(p1, p2)); - - // Export the simple table - String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" }; - assertTrue(runExport(args)); - - // Import to a new table - final String IMPORT_TABLE = name.getMethodName() + "import"; - desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE)); - desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5)); - UTIL.getAdmin().createTable(desc); - - Table importTable = UTIL.getConnection().getTable(desc.getTableName()); - args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(), - "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, - FQ_OUTPUT_DIR, - "1000" }; - assertTrue(runImport(args)); - - // get the count of the source table for that time range - PrefixFilter filter = new PrefixFilter(ROW1); - int count = getCount(exportTable, filter); - - Assert.assertEquals("Unexpected row count between export and import tables", count, - getCount(importTable, null)); - - // and then test that a broken command doesn't bork everything - easier here because we don't - // need to re-run the export job - - args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(), - "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", name.getMethodName(), - FQ_OUTPUT_DIR, "1000" }; - assertFalse(runImport(args)); - - // cleanup - exportTable.close(); - importTable.close(); - } - - /** - * Count the number of keyvalues in the specified table for the given timerange - * @param start - * @param end - * @param table - * @return - * @throws IOException - */ - private int getCount(Table table, Filter filter) throws IOException { - Scan scan = new Scan(); - scan.setFilter(filter); - ResultScanner results = table.getScanner(scan); - int count = 0; - for (Result res : results) { - count += res.size(); - } - results.close(); - return count; - } - - /** - * test main method. Import should print help and call System.exit - */ - @Test - public void testImportMain() throws Exception { - PrintStream oldPrintStream = System.err; - SecurityManager SECURITY_MANAGER = System.getSecurityManager(); - LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); - System.setSecurityManager(newSecurityManager); - ByteArrayOutputStream data = new ByteArrayOutputStream(); - String[] args = {}; - System.setErr(new PrintStream(data)); - try { - System.setErr(new PrintStream(data)); - Import.main(args); - fail("should be SecurityException"); - } catch (SecurityException e) { - assertEquals(-1, newSecurityManager.getExitCode()); - assertTrue(data.toString().contains("Wrong number of arguments:")); - assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); - assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>")); - assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output")); - assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false")); - } finally { - System.setErr(oldPrintStream); - System.setSecurityManager(SECURITY_MANAGER); - } - } - - /** - * test main method. Export should print help and call System.exit - */ - @Test - public void testExportMain() throws Exception { - PrintStream oldPrintStream = System.err; - SecurityManager SECURITY_MANAGER = System.getSecurityManager(); - LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); - System.setSecurityManager(newSecurityManager); - ByteArrayOutputStream data = new ByteArrayOutputStream(); - String[] args = {}; - System.setErr(new PrintStream(data)); - try { - System.setErr(new PrintStream(data)); - Export.main(args); - fail("should be SecurityException"); - } catch (SecurityException e) { - assertEquals(-1, newSecurityManager.getExitCode()); - String errMsg = data.toString(); - assertTrue(errMsg.contains("Wrong number of arguments:")); - assertTrue(errMsg.contains( - "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " + - "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]")); - assertTrue( - errMsg.contains("-D hbase.mapreduce.scan.column.family=<family1>,<family2>, ...")); - assertTrue(errMsg.contains("-D hbase.mapreduce.include.deleted.rows=true")); - assertTrue(errMsg.contains("-Dhbase.client.scanner.caching=100")); - assertTrue(errMsg.contains("-Dmapreduce.map.speculative=false")); - assertTrue(errMsg.contains("-Dmapreduce.reduce.speculative=false")); - assertTrue(errMsg.contains("-Dhbase.export.scanner.batch=10")); - } finally { - System.setErr(oldPrintStream); - System.setSecurityManager(SECURITY_MANAGER); - } - } - - /** - * Test map method of Importer - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test - public void testKeyValueImporter() throws Exception { - KeyValueImporter importer = new KeyValueImporter(); - Configuration configuration = new Configuration(); - Context ctx = mock(Context.class); - when(ctx.getConfiguration()).thenReturn(configuration); - - doAnswer(new Answer<Void>() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; - KeyValue key = (KeyValue) invocation.getArguments()[1]; - assertEquals("Key", Bytes.toString(writer.get())); - assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); - return null; - } - }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); - - importer.setup(ctx); - Result value = mock(Result.class); - KeyValue[] keys = { - new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), - Bytes.toBytes("value")), - new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"), - Bytes.toBytes("value1")) }; - when(value.rawCells()).thenReturn(keys); - importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx); - - } - - /** - * Test addFilterAndArguments method of Import This method set couple - * parameters into Configuration - */ - @Test - public void testAddFilterAndArguments() throws IOException { - Configuration configuration = new Configuration(); - - List<String> args = new ArrayList<>(); - args.add("param1"); - args.add("param2"); - - Import.addFilterAndArguments(configuration, FilterBase.class, args); - assertEquals("org.apache.hadoop.hbase.filter.FilterBase", - configuration.get(Import.FILTER_CLASS_CONF_KEY)); - assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY)); - } - - @Test - public void testDurability() throws Exception { - // Create an export table. - String exportTableName = name.getMethodName() + "export"; - try (Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);) { - - // Insert some data - Put put = new Put(ROW1); - put.addColumn(FAMILYA, QUAL, now, QUAL); - put.addColumn(FAMILYA, QUAL, now + 1, QUAL); - put.addColumn(FAMILYA, QUAL, now + 2, QUAL); - exportTable.put(put); - - put = new Put(ROW2); - put.addColumn(FAMILYA, QUAL, now, QUAL); - put.addColumn(FAMILYA, QUAL, now + 1, QUAL); - put.addColumn(FAMILYA, QUAL, now + 2, QUAL); - exportTable.put(put); - - // Run the export - String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"}; - assertTrue(runExport(args)); - - // Create the table for import - String importTableName = name.getMethodName() + "import1"; - Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); - - // Register the wal listener for the import table - HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); - TableWALActionListener walListener = new TableWALActionListener(region); - WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); - wal.registerWALActionsListener(walListener); - - // Run the import with SKIP_WAL - args = - new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(), - importTableName, FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); - //Assert that the wal is not visisted - assertTrue(!walListener.isWALVisited()); - //Ensure that the count is 2 (only one version of key value is obtained) - assertTrue(getCount(importTable, null) == 2); - - // Run the import with the default durability option - importTableName = name.getMethodName() + "import2"; - importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); - region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer() - .getOnlineRegions(importTable.getName()).get(0).getRegionInfo(); - wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region); - walListener = new TableWALActionListener(region); - wal.registerWALActionsListener(walListener); - args = new String[] { importTableName, FQ_OUTPUT_DIR }; - assertTrue(runImport(args)); - //Assert that the wal is visisted - assertTrue(walListener.isWALVisited()); - //Ensure that the count is 2 (only one version of key value is obtained) - assertTrue(getCount(importTable, null) == 2); - } - } - - /** - * This listens to the {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} to - * identify that an entry is written to the Write Ahead Log for the given table. - */ - private static class TableWALActionListener extends WALActionsListener.Base { - - private HRegionInfo regionInfo; - private boolean isVisited = false; - - public TableWALActionListener(HRegionInfo region) { - this.regionInfo = region; - } - - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) { - if (logKey.getTablename().getNameAsString().equalsIgnoreCase( - this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) { - isVisited = true; - } - } - - public boolean isWALVisited() { - return isVisited; - } - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java deleted file mode 100644 index 6d9b05b..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithOperationAttributes.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CategoryBasedTimeout; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.TestRule; - -@Category({MapReduceTests.class, LargeTests.class}) -public class TestImportTSVWithOperationAttributes implements Configurable { - @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). - withTimeout(this.getClass()).withLookingForStuckThread(true).build(); - private static final Log LOG = LogFactory.getLog(TestImportTSVWithOperationAttributes.class); - protected static final String NAME = TestImportTsv.class.getSimpleName(); - protected static HBaseTestingUtility util = new HBaseTestingUtility(); - - /** - * Delete the tmp directory after running doMROnTableTest. Boolean. Default is - * false. - */ - protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; - - /** - * Force use of combiner in doMROnTableTest. Boolean. Default is true. - */ - protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; - - private static Configuration conf; - - private static final String TEST_ATR_KEY = "test"; - - private final String FAMILY = "FAM"; - - @Rule - public TestName name = new TestName(); - - public Configuration getConf() { - return util.getConfiguration(); - } - - public void setConf(Configuration conf) { - throw new IllegalArgumentException("setConf not supported"); - } - - @BeforeClass - public static void provisionCluster() throws Exception { - conf = util.getConfiguration(); - conf.set("hbase.coprocessor.master.classes", OperationAttributesTestController.class.getName()); - conf.set("hbase.coprocessor.region.classes", OperationAttributesTestController.class.getName()); - util.startMiniCluster(); - } - - @AfterClass - public static void releaseCluster() throws Exception { - util.shutdownMiniCluster(); - } - - @Test - public void testMROnTable() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest=>myvalue\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1, true); - util.deleteTable(tableName); - } - - @Test - public void testMROnTableWithInvalidOperationAttr() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapperForOprAttr", - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_ATTRIBUTES_KEY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - String data = "KEY\u001bVALUE1\u001bVALUE2\u001btest1=>myvalue\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1, false); - util.deleteTable(tableName); - } - - /** - * Run an ImportTsv job and perform basic validation on the results. Returns - * the ImportTsv <code>Tool</code> instance so that other tests can inspect it - * for further validation as necessary. This method is static to insure - * non-reliance on instance's util/conf facilities. - * - * @param args - * Any arguments to pass BEFORE inputFile path is appended. - * @param dataAvailable - * @return The Tool instance used to run the test. - */ - private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args, - int valueMultiplier, boolean dataAvailable) throws Exception { - String table = args[args.length - 1]; - Configuration conf = new Configuration(util.getConfiguration()); - - // populate input file - FileSystem fs = FileSystem.get(conf); - Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat")); - FSDataOutputStream op = fs.create(inputPath, true); - op.write(Bytes.toBytes(data)); - op.close(); - LOG.debug(String.format("Wrote test data to file: %s", inputPath)); - - if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { - LOG.debug("Forcing combiner."); - conf.setInt("mapreduce.map.combine.minspills", 1); - } - - // run the import - List<String> argv = new ArrayList<>(Arrays.asList(args)); - argv.add(inputPath.toString()); - Tool tool = new ImportTsv(); - LOG.debug("Running ImportTsv with arguments: " + argv); - assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); - - validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable); - - if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { - LOG.debug("Deleting test subdirectory"); - util.cleanupDataTestDirOnTestFS(table); - } - return tool; - } - - /** - * Confirm ImportTsv via data in online table. - * - * @param dataAvailable - */ - private static void validateTable(Configuration conf, TableName tableName, String family, - int valueMultiplier, boolean dataAvailable) throws IOException { - - LOG.debug("Validating table."); - Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(tableName); - boolean verified = false; - long pause = conf.getLong("hbase.client.pause", 5 * 1000); - int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - for (int i = 0; i < numRetries; i++) { - try { - Scan scan = new Scan(); - // Scan entire family. - scan.addFamily(Bytes.toBytes(family)); - if (dataAvailable) { - ResultScanner resScanner = table.getScanner(scan); - for (Result res : resScanner) { - LOG.debug("Getting results " + res.size()); - assertTrue(res.size() == 2); - List<Cell> kvs = res.listCells(); - assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); - assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); - assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); - assertTrue(CellUtil.matchingValue(kvs.get(1), - Bytes.toBytes("VALUE" + 2 * valueMultiplier))); - // Only one result set is expected, so let it loop. - verified = true; - } - } else { - ResultScanner resScanner = table.getScanner(scan); - Result[] next = resScanner.next(2); - assertEquals(0, next.length); - verified = true; - } - - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - table.close(); - connection.close(); - assertTrue(verified); - } - - public static class OperationAttributesTestController implements RegionObserver { - - @Override - public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, - Durability durability) throws IOException { - Region region = e.getEnvironment().getRegion(); - if (!region.getRegionInfo().isMetaTable() - && !region.getRegionInfo().getTable().isSystemTable()) { - if (put.getAttribute(TEST_ATR_KEY) != null) { - LOG.debug("allow any put to happen " + region.getRegionInfo().getRegionNameAsString()); - } else { - e.bypass(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java deleted file mode 100644 index 4ab3d29..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -@Category({MapReduceTests.class, LargeTests.class}) -public class TestImportTSVWithTTLs implements Configurable { - - protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class); - protected static final String NAME = TestImportTsv.class.getSimpleName(); - protected static HBaseTestingUtility util = new HBaseTestingUtility(); - - /** - * Delete the tmp directory after running doMROnTableTest. Boolean. Default is - * false. - */ - protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; - - /** - * Force use of combiner in doMROnTableTest. Boolean. Default is true. - */ - protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; - - private final String FAMILY = "FAM"; - private static Configuration conf; - - @Rule - public TestName name = new TestName(); - - @Override - public Configuration getConf() { - return util.getConfiguration(); - } - - @Override - public void setConf(Configuration conf) { - throw new IllegalArgumentException("setConf not supported"); - } - - @BeforeClass - public static void provisionCluster() throws Exception { - conf = util.getConfiguration(); - // We don't check persistence in HFiles in this test, but if we ever do we will - // need this where the default hfile version is not 3 (i.e. 0.98) - conf.setInt("hfile.format.version", 3); - conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName()); - util.startMiniCluster(); - } - - @AfterClass - public static void releaseCluster() throws Exception { - util.shutdownMiniCluster(); - } - - @Test - public void testMROnTable() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1); - util.deleteTable(tableName); - } - - protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, - String[] args, int valueMultiplier) throws Exception { - TableName table = TableName.valueOf(args[args.length - 1]); - Configuration conf = new Configuration(util.getConfiguration()); - - // populate input file - FileSystem fs = FileSystem.get(conf); - Path inputPath = fs.makeQualified(new Path(util - .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); - FSDataOutputStream op = fs.create(inputPath, true); - op.write(Bytes.toBytes(data)); - op.close(); - LOG.debug(String.format("Wrote test data to file: %s", inputPath)); - - if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { - LOG.debug("Forcing combiner."); - conf.setInt("mapreduce.map.combine.minspills", 1); - } - - // run the import - List<String> argv = new ArrayList<>(Arrays.asList(args)); - argv.add(inputPath.toString()); - Tool tool = new ImportTsv(); - LOG.debug("Running ImportTsv with arguments: " + argv); - try { - // Job will fail if observer rejects entries without TTL - assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); - } finally { - // Clean up - if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { - LOG.debug("Deleting test subdirectory"); - util.cleanupDataTestDirOnTestFS(table.getNameAsString()); - } - } - - return tool; - } - - public static class TTLCheckingObserver implements RegionObserver { - - @Override - public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, - Durability durability) throws IOException { - Region region = e.getEnvironment().getRegion(); - if (!region.getRegionInfo().isMetaTable() - && !region.getRegionInfo().getTable().isSystemTable()) { - // The put carries the TTL attribute - if (put.getTTL() != Long.MAX_VALUE) { - return; - } - throw new IOException("Operation does not have TTL set"); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java deleted file mode 100644 index 8967ac7..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithVisibilityLabels.java +++ /dev/null @@ -1,495 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.visibility.Authorizations; -import org.apache.hadoop.hbase.security.visibility.CellVisibility; -import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator; -import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator; -import org.apache.hadoop.hbase.security.visibility.VisibilityClient; -import org.apache.hadoop.hbase.security.visibility.VisibilityConstants; -import org.apache.hadoop.hbase.security.visibility.VisibilityController; -import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -@Category({MapReduceTests.class, LargeTests.class}) -public class TestImportTSVWithVisibilityLabels implements Configurable { - - private static final Log LOG = LogFactory.getLog(TestImportTSVWithVisibilityLabels.class); - protected static final String NAME = TestImportTsv.class.getSimpleName(); - protected static HBaseTestingUtility util = new HBaseTestingUtility(); - - /** - * Delete the tmp directory after running doMROnTableTest. Boolean. Default is - * false. - */ - protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; - - /** - * Force use of combiner in doMROnTableTest. Boolean. Default is true. - */ - protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; - - private final String FAMILY = "FAM"; - private final static String TOPSECRET = "topsecret"; - private final static String PUBLIC = "public"; - private final static String PRIVATE = "private"; - private final static String CONFIDENTIAL = "confidential"; - private final static String SECRET = "secret"; - private static User SUPERUSER; - private static Configuration conf; - - @Rule - public TestName name = new TestName(); - - @Override - public Configuration getConf() { - return util.getConfiguration(); - } - - @Override - public void setConf(Configuration conf) { - throw new IllegalArgumentException("setConf not supported"); - } - - @BeforeClass - public static void provisionCluster() throws Exception { - conf = util.getConfiguration(); - SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { "supergroup" }); - conf.set("hbase.superuser", "admin,"+User.getCurrent().getName()); - conf.setInt("hfile.format.version", 3); - conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName()); - conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName()); - conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, SimpleScanLabelGenerator.class, - ScanLabelGenerator.class); - util.startMiniCluster(); - // Wait for the labels table to become available - util.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000); - createLabels(); - } - - private static void createLabels() throws IOException, InterruptedException { - PrivilegedExceptionAction<VisibilityLabelsResponse> action = - new PrivilegedExceptionAction<VisibilityLabelsResponse>() { - @Override - public VisibilityLabelsResponse run() throws Exception { - String[] labels = { SECRET, TOPSECRET, CONFIDENTIAL, PUBLIC, PRIVATE }; - try (Connection conn = ConnectionFactory.createConnection(conf)) { - VisibilityClient.addLabels(conn, labels); - LOG.info("Added labels "); - } catch (Throwable t) { - LOG.error("Error in adding labels" , t); - throw new IOException(t); - } - return null; - } - }; - SUPERUSER.runAs(action); - } - - @AfterClass - public static void releaseCluster() throws Exception { - util.shutdownMiniCluster(); - } - - @Test - public void testMROnTable() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1); - util.deleteTable(tableName); - } - - @Test - public void testMROnTableWithDeletes() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1); - issueDeleteAndVerifyData(tableName); - util.deleteTable(tableName); - } - - private void issueDeleteAndVerifyData(TableName tableName) throws IOException { - LOG.debug("Validating table after delete."); - Table table = util.getConnection().getTable(tableName); - boolean verified = false; - long pause = conf.getLong("hbase.client.pause", 5 * 1000); - int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - for (int i = 0; i < numRetries; i++) { - try { - Delete d = new Delete(Bytes.toBytes("KEY")); - d.addFamily(Bytes.toBytes(FAMILY)); - d.setCellVisibility(new CellVisibility("private&secret")); - table.delete(d); - - Scan scan = new Scan(); - // Scan entire family. - scan.addFamily(Bytes.toBytes(FAMILY)); - scan.setAuthorizations(new Authorizations("secret", "private")); - ResultScanner resScanner = table.getScanner(scan); - Result[] next = resScanner.next(5); - assertEquals(0, next.length); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - table.close(); - assertTrue(verified); - } - - @Test - public void testMROnTableWithBulkload() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), - "-D" + ImportTsv.COLUMNS_CONF_KEY - + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - String data = "KEY\u001bVALUE1\u001bVALUE2\u001bsecret&private\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1); - util.deleteTable(tableName); - } - - @Test - public void testBulkOutputWithTsvImporterTextMapper() throws Exception { - final TableName table = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - String FAMILY = "FAM"; - Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(table.getNameAsString()),"hfiles"); - // Prepare the arguments required for the test. - String[] args = - new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", - "-D" + ImportTsv.COLUMNS_CONF_KEY - + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), - table.getNameAsString() - }; - String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; - doMROnTableTest(util, FAMILY, data, args, 4); - util.deleteTable(table); - } - - @Test - public void testMRWithOutputFormat() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); - // Prepare the arguments required for the test. - String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - String data = "KEY\u001bVALUE4\u001bVALUE8\u001bsecret&private\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1); - util.deleteTable(tableName); - } - - @Test - public void testBulkOutputWithInvalidLabels() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); - // Prepare the arguments required for the test. - String[] args = - new String[] { "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - - // 2 Data rows, one with valid label and one with invalid label - String data = - "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1, 2); - util.deleteTable(tableName); - } - - @Test - public void testBulkOutputWithTsvImporterTextMapperWithInvalidLabels() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName() + UUID.randomUUID()); - Path hfiles = new Path(util.getDataTestDirOnTestFS(tableName.getNameAsString()), "hfiles"); - // Prepare the arguments required for the test. - String[] args = - new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_VISIBILITY", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName.getNameAsString() }; - - // 2 Data rows, one with valid label and one with invalid label - String data = - "KEY\u001bVALUE1\u001bVALUE2\u001bprivate\nKEY1\u001bVALUE1\u001bVALUE2\u001binvalid\n"; - util.createTable(tableName, FAMILY); - doMROnTableTest(util, FAMILY, data, args, 1, 2); - util.deleteTable(tableName); - } - - protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, - String[] args, int valueMultiplier) throws Exception { - return doMROnTableTest(util, family, data, args, valueMultiplier, -1); - } - - /** - * Run an ImportTsv job and perform basic validation on the results. Returns - * the ImportTsv <code>Tool</code> instance so that other tests can inspect it - * for further validation as necessary. This method is static to insure - * non-reliance on instance's util/conf facilities. - * - * @param args - * Any arguments to pass BEFORE inputFile path is appended. - * - * @param expectedKVCount Expected KV count. pass -1 to skip the kvcount check - * - * @return The Tool instance used to run the test. - */ - protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, - String[] args, int valueMultiplier,int expectedKVCount) throws Exception { - TableName table = TableName.valueOf(args[args.length - 1]); - Configuration conf = new Configuration(util.getConfiguration()); - - // populate input file - FileSystem fs = FileSystem.get(conf); - Path inputPath = fs.makeQualified(new Path(util - .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); - FSDataOutputStream op = fs.create(inputPath, true); - if (data == null) { - data = "KEY\u001bVALUE1\u001bVALUE2\n"; - } - op.write(Bytes.toBytes(data)); - op.close(); - LOG.debug(String.format("Wrote test data to file: %s", inputPath)); - - if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { - LOG.debug("Forcing combiner."); - conf.setInt("mapreduce.map.combine.minspills", 1); - } - - // run the import - List<String> argv = new ArrayList<>(Arrays.asList(args)); - argv.add(inputPath.toString()); - Tool tool = new ImportTsv(); - LOG.debug("Running ImportTsv with arguments: " + argv); - assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args))); - - // Perform basic validation. If the input args did not include - // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. - // Otherwise, validate presence of hfiles. - boolean createdHFiles = false; - String outputPath = null; - for (String arg : argv) { - if (arg.contains(ImportTsv.BULK_OUTPUT_CONF_KEY)) { - createdHFiles = true; - // split '-Dfoo=bar' on '=' and keep 'bar' - outputPath = arg.split("=")[1]; - break; - } - } - LOG.debug("validating the table " + createdHFiles); - if (createdHFiles) - validateHFiles(fs, outputPath, family,expectedKVCount); - else - validateTable(conf, table, family, valueMultiplier); - - if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { - LOG.debug("Deleting test subdirectory"); - util.cleanupDataTestDirOnTestFS(table.getNameAsString()); - } - return tool; - } - - /** - * Confirm ImportTsv via HFiles on fs. - */ - private static void validateHFiles(FileSystem fs, String outputPath, String family, - int expectedKVCount) throws IOException { - - // validate number and content of output columns - LOG.debug("Validating HFiles."); - Set<String> configFamilies = new HashSet<>(); - configFamilies.add(family); - Set<String> foundFamilies = new HashSet<>(); - int actualKVCount = 0; - for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { - LOG.debug("The output path has files"); - String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); - String cf = elements[elements.length - 1]; - foundFamilies.add(cf); - assertTrue(String.format( - "HFile ouput contains a column family (%s) not present in input families (%s)", cf, - configFamilies), configFamilies.contains(cf)); - for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { - assertTrue(String.format("HFile %s appears to contain no data.", hfile.getPath()), - hfile.getLen() > 0); - if (expectedKVCount > -1) { - actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); - } - } - } - if (expectedKVCount > -1) { - assertTrue(String.format( - "KV count in output hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, - expectedKVCount), actualKVCount == expectedKVCount); - } - } - - /** - * Confirm ImportTsv via data in online table. - */ - private static void validateTable(Configuration conf, TableName tableName, String family, - int valueMultiplier) throws IOException { - - LOG.debug("Validating table."); - Table table = util.getConnection().getTable(tableName); - boolean verified = false; - long pause = conf.getLong("hbase.client.pause", 5 * 1000); - int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - for (int i = 0; i < numRetries; i++) { - try { - Scan scan = new Scan(); - // Scan entire family. - scan.addFamily(Bytes.toBytes(family)); - scan.setAuthorizations(new Authorizations("secret","private")); - ResultScanner resScanner = table.getScanner(scan); - Result[] next = resScanner.next(5); - assertEquals(1, next.length); - for (Result res : resScanner) { - LOG.debug("Getting results " + res.size()); - assertTrue(res.size() == 2); - List<Cell> kvs = res.listCells(); - assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); - assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); - assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); - assertTrue(CellUtil.matchingValue(kvs.get(1), - Bytes.toBytes("VALUE" + 2 * valueMultiplier))); - // Only one result set is expected, so let it loop. - } - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - table.close(); - assertTrue(verified); - } - - /** - * Method returns the total KVs in given hfile - * @param fs File System - * @param p HFile path - * @return KV count in the given hfile - * @throws IOException - */ - private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { - Configuration conf = util.getConfiguration(); - HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); - reader.loadFileInfo(); - HFileScanner scanner = reader.getScanner(false, false); - scanner.seekTo(); - int count = 0; - do { - count++; - } while (scanner.next()); - reader.close(); - return count; - } - -}