http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java new file mode 100644 index 0000000..7b6e684 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -0,0 +1,571 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; + +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestImportTsv implements Configurable { + + private static final Log LOG = LogFactory.getLog(TestImportTsv.class); + protected static final String NAME = TestImportTsv.class.getSimpleName(); + protected static HBaseTestingUtility util = new HBaseTestingUtility(); + + // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. + protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; + + /** + * Force use of combiner in doMROnTableTest. Boolean. Default is true. + */ + protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; + + private final String FAMILY = "FAM"; + private TableName tn; + private Map<String, String> args; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + public Configuration getConf() { + return util.getConfiguration(); + } + + public void setConf(Configuration conf) { + throw new IllegalArgumentException("setConf not supported"); + } + + @BeforeClass + public static void provisionCluster() throws Exception { + util.startMiniCluster(); + } + + @AfterClass + public static void releaseCluster() throws Exception { + util.shutdownMiniCluster(); + } + + @Before + public void setup() throws Exception { + tn = TableName.valueOf("test-" + UUID.randomUUID()); + args = new HashMap<>(); + // Prepare the arguments required for the test. + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); + } + + @Test + public void testMROnTable() throws Exception { + util.createTable(tn, FAMILY); + doMROnTableTest(null, 1); + util.deleteTable(tn); + } + + @Test + public void testMROnTableWithTimestamp() throws Exception { + util.createTable(tn, FAMILY); + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); + String data = "KEY,1234,VALUE1,VALUE2\n"; + + doMROnTableTest(data, 1); + util.deleteTable(tn); + } + + @Test + public void testMROnTableWithCustomMapper() + throws Exception { + util.createTable(tn, FAMILY); + args.put(ImportTsv.MAPPER_CONF_KEY, + "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); + + doMROnTableTest(null, 3); + util.deleteTable(tn); + } + + @Test + public void testBulkOutputWithoutAnExistingTable() throws Exception { + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + + doMROnTableTest(null, 3); + util.deleteTable(tn); + } + + @Test + public void testBulkOutputWithAnExistingTable() throws Exception { + util.createTable(tn, FAMILY); + + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + + doMROnTableTest(null, 3); + util.deleteTable(tn); + } + + @Test + public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { + util.createTable(tn, FAMILY); + + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); + doMROnTableTest(null, 3); + util.deleteTable(tn); + } + + @Test + public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); + String INPUT_FILE = "InputFile1.csv"; + // Prepare the arguments required for the test. + String[] args = + new String[] { + "-D" + ImportTsv.MAPPER_CONF_KEY + + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", + "-D" + ImportTsv.COLUMNS_CONF_KEY + + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), + tn.getNameAsString(), + INPUT_FILE + }; + assertEquals("running test job configuration failed.", 0, ToolRunner.run( + new Configuration(util.getConfiguration()), + new ImportTsv() { + @Override + public int run(String[] args) throws Exception { + Job job = createSubmittableJob(getConf(), args); + assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); + assertTrue(job.getReducerClass().equals(TextSortReducer.class)); + assertTrue(job.getMapOutputValueClass().equals(Text.class)); + return 0; + } + }, args)); + // Delete table created by createSubmittableJob. + util.deleteTable(tn); + } + + @Test + public void testBulkOutputWithTsvImporterTextMapper() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); + args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + String data = "KEY\u001bVALUE4\u001bVALUE8\n"; + doMROnTableTest(data, 4); + util.deleteTable(tn); + } + + @Test + public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { + String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; + + Configuration conf = new Configuration(util.getConfiguration()); + conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); + conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); + conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); + exception.expect(TableNotFoundException.class); + assertEquals("running test job configuration failed.", 0, + ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { + @Override public int run(String[] args) throws Exception { + createSubmittableJob(getConf(), args); + return 0; + } + }, args)); + } + + @Test + public void testMRWithoutAnExistingTable() throws Exception { + String[] args = + new String[] { tn.getNameAsString(), "/inputFile" }; + + exception.expect(TableNotFoundException.class); + assertEquals("running test job configuration failed.", 0, ToolRunner.run( + new Configuration(util.getConfiguration()), + new ImportTsv() { + @Override + public int run(String[] args) throws Exception { + createSubmittableJob(getConf(), args); + return 0; + } + }, args)); + } + + @Test + public void testJobConfigurationsWithDryMode() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); + String INPUT_FILE = "InputFile1.csv"; + // Prepare the arguments required for the test. + String[] argsArray = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), + "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", + tn.getNameAsString(), + INPUT_FILE }; + assertEquals("running test job configuration failed.", 0, ToolRunner.run( + new Configuration(util.getConfiguration()), + new ImportTsv() { + @Override + public int run(String[] args) throws Exception { + Job job = createSubmittableJob(getConf(), args); + assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); + return 0; + } + }, argsArray)); + // Delete table created by createSubmittableJob. + util.deleteTable(tn); + } + + @Test + public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { + util.createTable(tn, FAMILY); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + doMROnTableTest(null, 1); + // Dry mode should not delete an existing table. If it's not present, + // this will throw TableNotFoundException. + util.deleteTable(tn); + } + + /** + * If table is not present in non-bulk mode, dry run should fail just like + * normal mode. + */ + @Test + public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + exception.expect(TableNotFoundException.class); + doMROnTableTest(null, 1); + } + + @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception { + util.createTable(tn, FAMILY); + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + doMROnTableTest(null, 1); + // Dry mode should not delete an existing table. If it's not present, + // this will throw TableNotFoundException. + util.deleteTable(tn); + } + + /** + * If table is not present in bulk mode and create.table is not set to yes, + * import should fail with TableNotFoundException. + */ + @Test + public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws + Exception { + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); + exception.expect(TableNotFoundException.class); + doMROnTableTest(null, 1); + } + + @Test + public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); + args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); + args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); + doMROnTableTest(null, 1); + // Verify temporary table was deleted. + exception.expect(TableNotFoundException.class); + util.deleteTable(tn); + } + + /** + * If there are invalid data rows as inputs, then only those rows should be ignored. + */ + @Test + public void testTsvImporterTextMapperWithInvalidData() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); + // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS + String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; + doMROnTableTest(util, tn, FAMILY, data, args, 1, 4); + util.deleteTable(tn); + } + + @Test + public void testSkipEmptyColumns() throws Exception { + Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); + args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); + args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); + args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); + args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); + // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 + String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; + doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); + util.deleteTable(tn); + } + + private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { + return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1); + } + + protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, + String family, String data, Map<String, String> args) throws Exception { + return doMROnTableTest(util, table, family, data, args, 1,-1); + } + + /** + * Run an ImportTsv job and perform basic validation on the results. + * Returns the ImportTsv <code>Tool</code> instance so that other tests can + * inspect it for further validation as necessary. This method is static to + * insure non-reliance on instance's util/conf facilities. + * @param args Any arguments to pass BEFORE inputFile path is appended. + * @return The Tool instance used to run the test. + */ + protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, + String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount) + throws Exception { + Configuration conf = new Configuration(util.getConfiguration()); + + // populate input file + FileSystem fs = FileSystem.get(conf); + Path inputPath = fs.makeQualified( + new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); + FSDataOutputStream op = fs.create(inputPath, true); + if (data == null) { + data = "KEY\u001bVALUE1\u001bVALUE2\n"; + } + op.write(Bytes.toBytes(data)); + op.close(); + LOG.debug(String.format("Wrote test data to file: %s", inputPath)); + + if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { + LOG.debug("Forcing combiner."); + conf.setInt("mapreduce.map.combine.minspills", 1); + } + + // Build args array. + String[] argsArray = new String[args.size() + 2]; + Iterator it = args.entrySet().iterator(); + int i = 0; + while (it.hasNext()) { + Map.Entry pair = (Map.Entry) it.next(); + argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); + i++; + } + argsArray[i] = table.getNameAsString(); + argsArray[i + 1] = inputPath.toString(); + + // run the import + Tool tool = new ImportTsv(); + LOG.debug("Running ImportTsv with arguments: " + argsArray); + assertEquals(0, ToolRunner.run(conf, tool, argsArray)); + + // Perform basic validation. If the input args did not include + // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. + // Otherwise, validate presence of hfiles. + boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) && + "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); + if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { + if (isDryRun) { + assertFalse(String.format("Dry run mode, %s should not have been created.", + ImportTsv.BULK_OUTPUT_CONF_KEY), + fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); + } else { + validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount); + } + } else { + validateTable(conf, table, family, valueMultiplier, isDryRun); + } + + if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { + LOG.debug("Deleting test subdirectory"); + util.cleanupDataTestDirOnTestFS(table.getNameAsString()); + } + return tool; + } + + /** + * Confirm ImportTsv via data in online table. + */ + private static void validateTable(Configuration conf, TableName tableName, + String family, int valueMultiplier, boolean isDryRun) throws IOException { + + LOG.debug("Validating table."); + Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(tableName); + boolean verified = false; + long pause = conf.getLong("hbase.client.pause", 5 * 1000); + int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + Scan scan = new Scan(); + // Scan entire family. + scan.addFamily(Bytes.toBytes(family)); + ResultScanner resScanner = table.getScanner(scan); + int numRows = 0; + for (Result res : resScanner) { + numRows++; + assertEquals(2, res.size()); + List<Cell> kvs = res.listCells(); + assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); + assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); + assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); + // Only one result set is expected, so let it loop. + } + if (isDryRun) { + assertEquals(0, numRows); + } else { + assertEquals(1, numRows); + } + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + table.close(); + connection.close(); + assertTrue(verified); + } + + /** + * Confirm ImportTsv via HFiles on fs. + */ + private static void validateHFiles(FileSystem fs, String outputPath, String family, + int expectedKVCount) throws IOException { + // validate number and content of output columns + LOG.debug("Validating HFiles."); + Set<String> configFamilies = new HashSet<>(); + configFamilies.add(family); + Set<String> foundFamilies = new HashSet<>(); + int actualKVCount = 0; + for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { + String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); + String cf = elements[elements.length - 1]; + foundFamilies.add(cf); + assertTrue( + String.format( + "HFile output contains a column family (%s) not present in input families (%s)", + cf, configFamilies), + configFamilies.contains(cf)); + for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { + assertTrue( + String.format("HFile %s appears to contain no data.", hfile.getPath()), + hfile.getLen() > 0); + // count the number of KVs from all the hfiles + if (expectedKVCount > -1) { + actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); + } + } + } + assertTrue(String.format("HFile output does not contain the input family '%s'.", family), + foundFamilies.contains(family)); + if (expectedKVCount > -1) { + assertTrue(String.format( + "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, + expectedKVCount), actualKVCount == expectedKVCount); + } + } + + /** + * Method returns the total KVs in given hfile + * @param fs File System + * @param p HFile path + * @return KV count in the given hfile + * @throws IOException + */ + private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { + Configuration conf = util.getConfiguration(); + HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); + reader.loadFileInfo(); + HFileScanner scanner = reader.getScanner(false, false); + scanner.seekTo(); + int count = 0; + do { + count++; + } while (scanner.next()); + reader.close(); + return count; + } +} +
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java new file mode 100644 index 0000000..3c38102 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; +import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; + +/** + * Tests for {@link TsvParser}. + */ +@Category({MapReduceTests.class, SmallTests.class}) +public class TestImportTsvParser { + + private void assertBytesEquals(byte[] a, byte[] b) { + assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b)); + } + + private void checkParsing(ParsedLine parsed, Iterable<String> expected) { + ArrayList<String> parsedCols = new ArrayList<>(); + for (int i = 0; i < parsed.getColumnCount(); i++) { + parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i), + parsed.getColumnLength(i))); + } + if (!Iterables.elementsEqual(parsedCols, expected)) { + fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:" + + Joiner.on(",").join(parsedCols)); + } + } + + @Test + public void testTsvParserSpecParsing() { + TsvParser parser; + + parser = new TsvParser("HBASE_ROW_KEY", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ATTRIBUTES_KEY", + "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + assertEquals(4, parser.getAttributesKeyColumnIndex()); + + parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ROW_KEY", + "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(4, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + assertEquals(0, parser.getAttributesKeyColumnIndex()); + } + + @Test + public void testTsvParser() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t"); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); + assertNull(parser.getFamily(2)); + assertNull(parser.getQualifier(2)); + assertEquals(2, parser.getRowKeyColumnIndex()); + + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); + ParsedLine parsed = parser.parse(line, line.length); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + + @Test + public void testTsvParserWithTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertNull(parser.getFamily(1)); + assertNull(parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("rowkey\t1234\tval_a"); + ParsedLine parsed = parser.parse(line, line.length); + assertEquals(1234l, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + + /** + * Test cases that throw BadTsvLineException + */ + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); + byte[] line = Bytes.toBytes(""); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); + byte[] line = Bytes.toBytes("key_only"); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); + byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); + parser.parse(line, line.length); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserInvalidTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a"); + ParsedLine parsed = parser.parse(line, line.length); + assertEquals(-1, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + + @Test(expected = BadTsvLineException.class) + public void testTsvParserNoTimestampValue() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a"); + parser.parse(line, line.length); + } + + @Test + public void testTsvParserParseRowKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); + assertEquals(0, parser.getRowKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a\t1234"); + Pair<Integer, Integer> rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(0, rowKeyOffsets.getFirst().intValue()); + assertEquals(6, rowKeyOffsets.getSecond().intValue()); + try { + line = Bytes.toBytes("\t\tval_a\t1234"); + parser.parseRowKey(line, line.length); + fail("Should get BadTsvLineException on empty rowkey."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t"); + assertEquals(1, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\trowkey\t1234"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(6, rowKeyOffsets.getFirst().intValue()); + assertEquals(6, rowKeyOffsets.getSecond().intValue()); + try { + line = Bytes.toBytes("val_a"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + fail("Should get BadTsvLineException when number of columns less than rowkey position."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("col_a,HBASE_TS_KEY,HBASE_ROW_KEY", "\t"); + assertEquals(2, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\t1234\trowkey"); + rowKeyOffsets = parser.parseRowKey(line, line.length); + assertEquals(11, rowKeyOffsets.getFirst().intValue()); + assertEquals(6, rowKeyOffsets.getSecond().intValue()); + } + + @Test + public void testTsvParseAttributesKey() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY", "\t"); + assertEquals(0, parser.getRowKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value"); + ParsedLine parse = parser.parse(line, line.length); + assertEquals(18, parse.getAttributeKeyOffset()); + assertEquals(3, parser.getAttributesKeyColumnIndex()); + String attributes[] = parse.getIndividualAttributes(); + assertEquals(attributes[0], "key=>value"); + try { + line = Bytes.toBytes("rowkey\tval_a\t1234"); + parser.parse(line, line.length); + fail("Should get BadTsvLineException on empty rowkey."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t"); + assertEquals(2, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("key=>value\tval_a\trowkey\t1234"); + parse = parser.parse(line, line.length); + assertEquals(0, parse.getAttributeKeyOffset()); + assertEquals(0, parser.getAttributesKeyColumnIndex()); + attributes = parse.getIndividualAttributes(); + assertEquals(attributes[0], "key=>value"); + try { + line = Bytes.toBytes("val_a"); + ParsedLine parse2 = parser.parse(line, line.length); + fail("Should get BadTsvLineException when number of columns less than rowkey position."); + } catch (BadTsvLineException b) { + + } + parser = new TsvParser("col_a,HBASE_ATTRIBUTES_KEY,HBASE_TS_KEY,HBASE_ROW_KEY", "\t"); + assertEquals(3, parser.getRowKeyColumnIndex()); + line = Bytes.toBytes("val_a\tkey0=>value0,key1=>value1,key2=>value2\t1234\trowkey"); + parse = parser.parse(line, line.length); + assertEquals(1, parser.getAttributesKeyColumnIndex()); + assertEquals(6, parse.getAttributeKeyOffset()); + String[] attr = parse.getIndividualAttributes(); + int i = 0; + for(String str : attr) { + assertEquals(("key"+i+"=>"+"value"+i), str ); + i++; + } + } + + @Test + public void testTsvParserWithCellVisibilityCol() throws BadTsvLineException { + TsvParser parser = new TsvParser( + "HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY,HBASE_CELL_VISIBILITY", "\t"); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(4, parser.getCellVisibilityColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value\tPRIVATE&SECRET"); + ParsedLine parse = parser.parse(line, line.length); + assertEquals(18, parse.getAttributeKeyOffset()); + assertEquals(3, parser.getAttributesKeyColumnIndex()); + String attributes[] = parse.getIndividualAttributes(); + assertEquals(attributes[0], "key=>value"); + assertEquals(29, parse.getCellVisibilityColumnOffset()); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java new file mode 100644 index 0000000..8187b73 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStream; +import java.io.Writer; +import java.text.MessageFormat; +import java.util.Properties; +import java.util.jar.JarInputStream; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +/** + * This file was forked from hadoop/common/branches/branch-2@1350012. + */ +@Category(SmallTests.class) +public class TestJarFinder { + + @Test + public void testJar() throws Exception { + + //picking a class that is for sure in a JAR in the classpath + String jar = JarFinder.getJar(LogFactory.class); + Assert.assertTrue(new File(jar).exists()); + } + + private static void delete(File file) throws IOException { + if (file.getAbsolutePath().length() < 5) { + throw new IllegalArgumentException( + MessageFormat.format("Path [{0}] is too short, not deleting", + file.getAbsolutePath())); + } + if (file.exists()) { + if (file.isDirectory()) { + File[] children = file.listFiles(); + if (children != null) { + for (File child : children) { + delete(child); + } + } + } + if (!file.delete()) { + throw new RuntimeException( + MessageFormat.format("Could not delete path [{0}]", + file.getAbsolutePath())); + } + } + } + + @Test + public void testExpandedClasspath() throws Exception { + //picking a class that is for sure in a directory in the classpath + //in this case the JAR is created on the fly + String jar = JarFinder.getJar(TestJarFinder.class); + Assert.assertTrue(new File(jar).exists()); + } + + @Test + public void testExistingManifest() throws Exception { + File dir = new File(System.getProperty("test.build.dir", "target/test-dir"), + TestJarFinder.class.getName() + "-testExistingManifest"); + delete(dir); + dir.mkdirs(); + + File metaInfDir = new File(dir, "META-INF"); + metaInfDir.mkdirs(); + File manifestFile = new File(metaInfDir, "MANIFEST.MF"); + Manifest manifest = new Manifest(); + OutputStream os = new FileOutputStream(manifestFile); + manifest.write(os); + os.close(); + + File propsFile = new File(dir, "props.properties"); + Writer writer = new FileWriter(propsFile); + new Properties().store(writer, ""); + writer.close(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + JarOutputStream zos = new JarOutputStream(baos); + JarFinder.jarDir(dir, "", zos); + JarInputStream jis = + new JarInputStream(new ByteArrayInputStream(baos.toByteArray())); + Assert.assertNotNull(jis.getManifest()); + jis.close(); + } + + @Test + public void testNoManifest() throws Exception { + File dir = new File(System.getProperty("test.build.dir", "target/test-dir"), + TestJarFinder.class.getName() + "-testNoManifest"); + delete(dir); + dir.mkdirs(); + File propsFile = new File(dir, "props.properties"); + Writer writer = new FileWriter(propsFile); + new Properties().store(writer, ""); + writer.close(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + JarOutputStream zos = new JarOutputStream(baos); + JarFinder.jarDir(dir, "", zos); + JarInputStream jis = + new JarInputStream(new ByteArrayInputStream(baos.toByteArray())); + Assert.assertNotNull(jis.getManifest()); + jis.close(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java new file mode 100644 index 0000000..529a448 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -0,0 +1,669 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ClientServiceCallable; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; + +/** + * Test cases for the atomic load error handling of the bulk load functionality. + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestLoadIncrementalHFilesSplitRecovery { + private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); + + static HBaseTestingUtility util; + //used by secure subclass + static boolean useSecure = false; + + final static int NUM_CFS = 10; + final static byte[] QUAL = Bytes.toBytes("qual"); + final static int ROWCOUNT = 100; + + private final static byte[][] families = new byte[NUM_CFS][]; + + @Rule + public TestName name = new TestName(); + + static { + for (int i = 0; i < NUM_CFS; i++) { + families[i] = Bytes.toBytes(family(i)); + } + } + + static byte[] rowkey(int i) { + return Bytes.toBytes(String.format("row_%08d", i)); + } + + static String family(int i) { + return String.format("family_%04d", i); + } + + static byte[] value(int i) { + return Bytes.toBytes(String.format("%010d", i)); + } + + public static void buildHFiles(FileSystem fs, Path dir, int value) + throws IOException { + byte[] val = value(value); + for (int i = 0; i < NUM_CFS; i++) { + Path testIn = new Path(dir, family(i)); + + TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), + Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); + } + } + + /** + * Creates a table with given table name and specified number of column + * families if the table does not already exist. + */ + private void setupTable(final Connection connection, TableName table, int cfs) + throws IOException { + try { + LOG.info("Creating table " + table); + HTableDescriptor htd = new HTableDescriptor(table); + for (int i = 0; i < cfs; i++) { + htd.addFamily(new HColumnDescriptor(family(i))); + } + try (Admin admin = connection.getAdmin()) { + admin.createTable(htd); + } + } catch (TableExistsException tee) { + LOG.info("Table " + table + " already exists"); + } + } + + /** + * Creates a table with given table name,specified number of column families<br> + * and splitkeys if the table does not already exist. + * @param table + * @param cfs + * @param SPLIT_KEYS + */ + private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) + throws IOException { + try { + LOG.info("Creating table " + table); + HTableDescriptor htd = new HTableDescriptor(table); + for (int i = 0; i < cfs; i++) { + htd.addFamily(new HColumnDescriptor(family(i))); + } + + util.createTable(htd, SPLIT_KEYS); + } catch (TableExistsException tee) { + LOG.info("Table " + table + " already exists"); + } + } + + private Path buildBulkFiles(TableName table, int value) throws Exception { + Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); + Path bulk1 = new Path(dir, table.getNameAsString() + value); + FileSystem fs = util.getTestFileSystem(); + buildHFiles(fs, bulk1, value); + return bulk1; + } + + /** + * Populate table with known values. + */ + private void populateTable(final Connection connection, TableName table, int value) + throws Exception { + // create HFiles for different column families + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); + Path bulk1 = buildBulkFiles(table, value); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(bulk1, admin, t, locator); + } + } + + /** + * Split the known table in half. (this is hard coded for this test suite) + */ + private void forceSplit(TableName table) { + try { + // need to call regions server to by synchronous but isn't visible. + HRegionServer hrs = util.getRSForFirstRegionInTable(table); + + for (HRegionInfo hri : + ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { + if (hri.getTable().equals(table)) { + util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); + //ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); + } + } + + // verify that split completed. + int regions; + do { + regions = 0; + for (HRegionInfo hri : + ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { + if (hri.getTable().equals(table)) { + regions++; + } + } + if (regions != 2) { + LOG.info("Taking some time to complete split..."); + Thread.sleep(250); + } + } while (regions != 2); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @BeforeClass + public static void setupCluster() throws Exception { + util = new HBaseTestingUtility(); + util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); + util.startMiniCluster(1); + } + + @AfterClass + public static void teardownCluster() throws Exception { + util.shutdownMiniCluster(); + } + + /** + * Checks that all columns have the expected value and that there is the + * expected number of rows. + * @throws IOException + */ + void assertExpectedTable(TableName table, int count, int value) throws IOException { + HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString()); + assertEquals(htds.length, 1); + Table t = null; + try { + t = util.getConnection().getTable(table); + Scan s = new Scan(); + ResultScanner sr = t.getScanner(s); + int i = 0; + for (Result r : sr) { + i++; + for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) { + for (byte[] val : nm.values()) { + assertTrue(Bytes.equals(val, value(value))); + } + } + } + assertEquals(count, i); + } catch (IOException e) { + fail("Failed due to exception"); + } finally { + if (t != null) t.close(); + } + } + + /** + * Test that shows that exception thrown from the RS side will result in an + * exception on the LIHFile client. + */ + @Test(expected=IOException.class, timeout=120000) + public void testBulkLoadPhaseFailure() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + final AtomicInteger attmptedCalls = new AtomicInteger(); + final AtomicInteger failedCalls = new AtomicInteger(); + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + try (Connection connection = ConnectionFactory.createConnection(util + .getConfiguration())) { + setupTable(connection, table, 10); + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + @Override + protected List<LoadQueueItem> tryAtomicRegionLoad( + ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first, + Collection<LoadQueueItem> lqis) throws IOException { + int i = attmptedCalls.incrementAndGet(); + if (i == 1) { + Connection errConn; + try { + errConn = getMockedConnection(util.getConfiguration()); + serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true); + } catch (Exception e) { + LOG.fatal("mocking cruft, should never happen", e); + throw new RuntimeException("mocking cruft, should never happen"); + } + failedCalls.incrementAndGet(); + return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); + } + + return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); + } + }; + try { + // create HFiles for different column families + Path dir = buildBulkFiles(table, 1); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(dir, admin, t, locator); + } + } finally { + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + } + fail("doBulkLoad should have thrown an exception"); + } + } + + /** + * Test that shows that exception thrown from the RS side will result in the + * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} + * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set + */ + @Test + public void testRetryOnIOException() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + final AtomicInteger calls = new AtomicInteger(1); + final Connection conn = ConnectionFactory.createConnection(util + .getConfiguration()); + util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + util.getConfiguration().setBoolean( + LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); + final LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + @Override + protected List<LoadQueueItem> tryAtomicRegionLoad( + ClientServiceCallable<byte[]> serverCallable, TableName tableName, + final byte[] first, Collection<LoadQueueItem> lqis) + throws IOException { + if (calls.getAndIncrement() < util.getConfiguration().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) { + ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>( + conn, tableName, first, new RpcControllerFactory( + util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { + @Override + public byte[] rpcCall() throws Exception { + throw new IOException("Error calling something on RegionServer"); + } + }; + return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); + } else { + return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); + } + } + }; + setupTable(conn, table, 10); + Path dir = buildBulkFiles(table, 1); + lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), + conn.getRegionLocator(table)); + util.getConfiguration().setBoolean( + LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); + + } + + @SuppressWarnings("deprecation") + private ClusterConnection getMockedConnection(final Configuration conf) + throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException { + ClusterConnection c = Mockito.mock(ClusterConnection.class); + Mockito.when(c.getConfiguration()).thenReturn(conf); + Mockito.doNothing().when(c).close(); + // Make it so we return a particular location when asked. + final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, + ServerName.valueOf("example.org", 1234, 0)); + Mockito.when(c.getRegionLocation((TableName) Mockito.any(), + (byte[]) Mockito.any(), Mockito.anyBoolean())). + thenReturn(loc); + Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())). + thenReturn(loc); + ClientProtos.ClientService.BlockingInterface hri = + Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); + Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())). + thenThrow(new ServiceException(new IOException("injecting bulk load error"))); + Mockito.when(c.getClient(Mockito.any(ServerName.class))). + thenReturn(hri); + return c; + } + + /** + * This test exercises the path where there is a split after initial + * validation but before the atomic bulk load call. We cannot use presplitting + * to test this path, so we actually inject a split just before the atomic + * region load. + */ + @Test (timeout=120000) + public void testSplitWhileBulkLoadPhase() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTable(connection, table, 10); + populateTable(connection, table,1); + assertExpectedTable(table, ROWCOUNT, 1); + + // Now let's cause trouble. This will occur after checks and cause bulk + // files to fail when attempt to atomically import. This is recoverable. + final AtomicInteger attemptedCalls = new AtomicInteger(); + LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { + @Override + protected void bulkLoadPhase(final Table htable, final Connection conn, + ExecutorService pool, Deque<LoadQueueItem> queue, + final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, + Map<LoadQueueItem, ByteBuffer> item2RegionMap) + throws IOException { + int i = attemptedCalls.incrementAndGet(); + if (i == 1) { + // On first attempt force a split. + forceSplit(table); + } + super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); + } + }; + + // create HFiles for different column families + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + Path bulk = buildBulkFiles(table, 2); + lih2.doBulkLoad(bulk, admin, t, locator); + } + + // check that data was loaded + // The three expected attempts are 1) failure because need to split, 2) + // load of split top 3) load of split bottom + assertEquals(attemptedCalls.get(), 3); + assertExpectedTable(table, ROWCOUNT, 2); + } + } + + /** + * This test splits a table and attempts to bulk load. The bulk import files + * should be split before atomically importing. + */ + @Test (timeout=120000) + public void testGroupOrSplitPresplit() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTable(connection, table, 10); + populateTable(connection, table, 1); + assertExpectedTable(connection, table, ROWCOUNT, 1); + forceSplit(table); + + final AtomicInteger countedLqis= new AtomicInteger(); + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + @Override + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, + final LoadQueueItem item, final Table htable, + final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable, + startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); + } + return lqis; + } + }; + + // create HFiles for different column families + Path bulk = buildBulkFiles(table, 2); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(bulk, admin, t, locator); + } + assertExpectedTable(connection, table, ROWCOUNT, 2); + assertEquals(20, countedLqis.get()); + } + } + + /** + * This test creates a table with many small regions. The bulk load files + * would be splitted multiple times before all of them can be loaded successfully. + */ + @Test (timeout=120000) + public void testSplitTmpFileCleanUp() throws Exception { + final TableName table = TableName.valueOf(name.getMethodName()); + byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), + Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), + Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")}; + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTableWithSplitkeys(table, 10, SPLIT_KEYS); + + LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); + + // create HFiles + Path bulk = buildBulkFiles(table, 2); + try (Table t = connection.getTable(table); + RegionLocator locator = connection.getRegionLocator(table); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(bulk, admin, t, locator); + } + // family path + Path tmpPath = new Path(bulk, family(0)); + // TMP_DIR under family path + tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); + FileSystem fs = bulk.getFileSystem(util.getConfiguration()); + // HFiles have been splitted, there is TMP_DIR + assertTrue(fs.exists(tmpPath)); + // TMP_DIR should have been cleaned-up + assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", + FSUtils.listStatus(fs, tmpPath)); + assertExpectedTable(connection, table, ROWCOUNT, 2); + } + } + + /** + * This simulates an remote exception which should cause LIHF to exit with an + * exception. + */ + @Test(expected = IOException.class, timeout=120000) + public void testGroupOrSplitFailure() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { + setupTable(connection, tableName, 10); + + LoadIncrementalHFiles lih = new LoadIncrementalHFiles( + util.getConfiguration()) { + int i = 0; + + @Override + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, + final LoadQueueItem item, final Table table, + final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + i++; + + if (i == 5) { + throw new IOException("failure"); + } + return super.groupOrSplit(regionGroups, item, table, startEndKeys); + } + }; + + // create HFiles for different column families + Path dir = buildBulkFiles(tableName,1); + try (Table t = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName); + Admin admin = connection.getAdmin()) { + lih.doBulkLoad(dir, admin, t, locator); + } + } + + fail("doBulkLoad should have thrown an exception"); + } + + @Test (timeout=120000) + public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; + // Share connection. We were failing to find the table with our new reverse scan because it + // looks for first region, not any region -- that is how it works now. The below removes first + // region in test. Was reliant on the Connection caching having first region. + Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); + Table table = connection.getTable(tableName); + + setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); + Path dir = buildBulkFiles(tableName, 2); + + final AtomicInteger countedLqis = new AtomicInteger(); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { + + @Override + protected Pair<List<LoadQueueItem>, String> groupOrSplit( + Multimap<ByteBuffer, LoadQueueItem> regionGroups, + final LoadQueueItem item, final Table htable, + final Pair<byte[][], byte[][]> startEndKeys) throws IOException { + Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable, + startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); + } + return lqis; + } + }; + + // do bulkload when there is no region hole in hbase:meta. + try (Table t = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName); + Admin admin = connection.getAdmin()) { + loader.doBulkLoad(dir, admin, t, locator); + } catch (Exception e) { + LOG.error("exeception=", e); + } + // check if all the data are loaded into the table. + this.assertExpectedTable(tableName, ROWCOUNT, 2); + + dir = buildBulkFiles(tableName, 3); + + // Mess it up by leaving a hole in the hbase:meta + List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); + for (HRegionInfo regionInfo : regionInfos) { + if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { + MetaTableAccessor.deleteRegion(connection, regionInfo); + break; + } + } + + try (Table t = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName); + Admin admin = connection.getAdmin()) { + loader.doBulkLoad(dir, admin, t, locator); + } catch (Exception e) { + LOG.error("exception=", e); + assertTrue("IOException expected", e instanceof IOException); + } + + table.close(); + + // Make sure at least the one region that still exists can be found. + regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); + assertTrue(regionInfos.size() >= 1); + + this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); + connection.close(); + } + + /** + * Checks that all columns have the expected value and that there is the + * expected number of rows. + * @throws IOException + */ + void assertExpectedTable(final Connection connection, TableName table, int count, int value) + throws IOException { + HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString()); + assertEquals(htds.length, 1); + Table t = null; + try { + t = connection.getTable(table); + Scan s = new Scan(); + ResultScanner sr = t.getScanner(s); + int i = 0; + for (Result r : sr) { + i++; + for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) { + for (byte[] val : nm.values()) { + assertTrue(Bytes.equals(val, value(value))); + } + } + } + assertEquals(count, i); + } catch (IOException e) { + fail("Failed due to exception"); + } finally { + if (t != null) t.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java new file mode 100644 index 0000000..0c5207b --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.mapreduce.Job; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; + +/** + * Tests various scan start and stop row scenarios. This is set in a scan and + * tested in a MapReduce job to see if that is handed over and done properly + * too. + */ +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase { + + @BeforeClass + public static void setupLogging() { + TEST_UTIL.enableDebug(MultiTableInputFormat.class); + } + + @Override + protected void initJob(List<Scan> scans, Job job) throws IOException { + TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..530d9c5 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Function; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps; +import edu.umd.cs.findbugs.annotations.Nullable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +@Category({ VerySlowMapReduceTests.class, LargeTests.class }) +public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase { + + protected Path restoreDir; + + @BeforeClass + public static void setUpSnapshots() throws Exception { + + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class); + TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class); + + // take a snapshot of every table we have. + for (String tableName : TABLES) { + SnapshotTestingUtils + .createSnapshotAndValidate(TEST_UTIL.getAdmin(), TableName.valueOf(tableName), + ImmutableList.of(INPUT_FAMILY), null, + snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()), + TEST_UTIL.getTestFileSystem(), true); + } + } + + @Before + public void setUp() throws Exception { + this.restoreDir = TEST_UTIL.getRandomDir(); + } + + @Override + protected void initJob(List<Scan> scans, Job job) throws IOException { + TableMapReduceUtil + .initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); + } + + protected Map<String, Collection<Scan>> getSnapshotScanMapping(final List<Scan> scans) { + return Multimaps.index(scans, new Function<Scan, String>() { + @Nullable + @Override + public String apply(Scan input) { + return snapshotNameForTable( + Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME))); + } + }).asMap(); + } + + public static String snapshotNameForTable(String tableName) { + return tableName + "_snapshot"; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java new file mode 100644 index 0000000..1c33848 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; + +@Category({ SmallTests.class }) +public class TestMultiTableSnapshotInputFormatImpl { + + private MultiTableSnapshotInputFormatImpl subject; + private Map<String, Collection<Scan>> snapshotScans; + private Path restoreDir; + private Configuration conf; + private Path rootDir; + + @Before + public void setUp() throws Exception { + this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl()); + + // mock out restoreSnapshot + // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper + // dependency into the + // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current + // design, and it *also* + // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would + // probably be the more "pure" + // way of doing things. This is the lesser of two evils, perhaps? + doNothing().when(this.subject). + restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class), + any(Path.class), any(FileSystem.class)); + + this.conf = new Configuration(); + this.rootDir = new Path("file:///test-root-dir"); + FSUtils.setRootDir(conf, rootDir); + this.snapshotScans = ImmutableMap.<String, Collection<Scan>>of("snapshot1", + ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2", + ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")), + new Scan(Bytes.toBytes("5"), Bytes.toBytes("6")))); + + this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir"); + + } + + public void callSetInput() throws IOException { + subject.setInput(this.conf, snapshotScans, restoreDir); + } + + public Map<String, Collection<ScanWithEquals>> toScanWithEquals( + Map<String, Collection<Scan>> snapshotScans) throws IOException { + Map<String, Collection<ScanWithEquals>> rtn = Maps.newHashMap(); + + for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) { + List<ScanWithEquals> scans = Lists.newArrayList(); + + for (Scan scan : entry.getValue()) { + scans.add(new ScanWithEquals(scan)); + } + rtn.put(entry.getKey(), scans); + } + + return rtn; + } + + public static class ScanWithEquals { + + private final String startRow; + private final String stopRow; + + /** + * Creates a new instance of this class while copying all values. + * + * @param scan The scan instance to copy from. + * @throws java.io.IOException When copying the values fails. + */ + public ScanWithEquals(Scan scan) throws IOException { + this.startRow = Bytes.toStringBinary(scan.getStartRow()); + this.stopRow = Bytes.toStringBinary(scan.getStopRow()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ScanWithEquals)) { + return false; + } + ScanWithEquals otherScan = (ScanWithEquals) obj; + return Objects.equals(this.startRow, otherScan.startRow) && Objects + .equals(this.stopRow, otherScan.stopRow); + } + + @Override + public String toString() { + return org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects. + toStringHelper(this).add("startRow", startRow) + .add("stopRow", stopRow).toString(); + } + } + + @Test + public void testSetInputSetsSnapshotToScans() throws Exception { + + callSetInput(); + + Map<String, Collection<Scan>> actual = subject.getSnapshotsToScans(conf); + + // convert to scans we can use .equals on + Map<String, Collection<ScanWithEquals>> actualWithEquals = toScanWithEquals(actual); + Map<String, Collection<ScanWithEquals>> expectedWithEquals = toScanWithEquals(snapshotScans); + + assertEquals(expectedWithEquals, actualWithEquals); + } + + @Test + public void testSetInputPushesRestoreDirectories() throws Exception { + callSetInput(); + + Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf); + + assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet()); + } + + @Test + public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception { + callSetInput(); + + Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf); + + for (Path snapshotDir : restoreDirs.values()) { + assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir, + snapshotDir.getParent()); + } + } + + @Test + public void testSetInputRestoresSnapshots() throws Exception { + callSetInput(); + + Map<String, Path> snapshotDirs = subject.getSnapshotDirs(conf); + + for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) { + verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir), + eq(entry.getValue()), any(FileSystem.class)); + } + } +}
