http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java deleted file mode 100644 index efcf91e..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ /dev/null @@ -1,571 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputFilesFilter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; - -@Category({VerySlowMapReduceTests.class, LargeTests.class}) -public class TestImportTsv implements Configurable { - - private static final Log LOG = LogFactory.getLog(TestImportTsv.class); - protected static final String NAME = TestImportTsv.class.getSimpleName(); - protected static HBaseTestingUtility util = new HBaseTestingUtility(); - - // Delete the tmp directory after running doMROnTableTest. Boolean. Default is true. - protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad"; - - /** - * Force use of combiner in doMROnTableTest. Boolean. Default is true. - */ - protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner"; - - private final String FAMILY = "FAM"; - private TableName tn; - private Map<String, String> args; - - @Rule - public ExpectedException exception = ExpectedException.none(); - - public Configuration getConf() { - return util.getConfiguration(); - } - - public void setConf(Configuration conf) { - throw new IllegalArgumentException("setConf not supported"); - } - - @BeforeClass - public static void provisionCluster() throws Exception { - util.startMiniCluster(); - } - - @AfterClass - public static void releaseCluster() throws Exception { - util.shutdownMiniCluster(); - } - - @Before - public void setup() throws Exception { - tn = TableName.valueOf("test-" + UUID.randomUUID()); - args = new HashMap<>(); - // Prepare the arguments required for the test. - args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A,FAM:B"); - args.put(ImportTsv.SEPARATOR_CONF_KEY, "\u001b"); - } - - @Test - public void testMROnTable() throws Exception { - util.createTable(tn, FAMILY); - doMROnTableTest(null, 1); - util.deleteTable(tn); - } - - @Test - public void testMROnTableWithTimestamp() throws Exception { - util.createTable(tn, FAMILY); - args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); - args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); - String data = "KEY,1234,VALUE1,VALUE2\n"; - - doMROnTableTest(data, 1); - util.deleteTable(tn); - } - - @Test - public void testMROnTableWithCustomMapper() - throws Exception { - util.createTable(tn, FAMILY); - args.put(ImportTsv.MAPPER_CONF_KEY, - "org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper"); - - doMROnTableTest(null, 3); - util.deleteTable(tn); - } - - @Test - public void testBulkOutputWithoutAnExistingTable() throws Exception { - // Prepare the arguments required for the test. - Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - - doMROnTableTest(null, 3); - util.deleteTable(tn); - } - - @Test - public void testBulkOutputWithAnExistingTable() throws Exception { - util.createTable(tn, FAMILY); - - // Prepare the arguments required for the test. - Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - - doMROnTableTest(null, 3); - util.deleteTable(tn); - } - - @Test - public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { - util.createTable(tn, FAMILY); - - // Prepare the arguments required for the test. - Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - args.put(ImportTsv.NO_STRICT_COL_FAMILY, "true"); - doMROnTableTest(null, 3); - util.deleteTable(tn); - } - - @Test - public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception { - Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); - String INPUT_FILE = "InputFile1.csv"; - // Prepare the arguments required for the test. - String[] args = - new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY - + "=org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper", - "-D" + ImportTsv.COLUMNS_CONF_KEY - + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), - tn.getNameAsString(), - INPUT_FILE - }; - assertEquals("running test job configuration failed.", 0, ToolRunner.run( - new Configuration(util.getConfiguration()), - new ImportTsv() { - @Override - public int run(String[] args) throws Exception { - Job job = createSubmittableJob(getConf(), args); - assertTrue(job.getMapperClass().equals(TsvImporterTextMapper.class)); - assertTrue(job.getReducerClass().equals(TextSortReducer.class)); - assertTrue(job.getMapOutputValueClass().equals(Text.class)); - return 0; - } - }, args)); - // Delete table created by createSubmittableJob. - util.deleteTable(tn); - } - - @Test - public void testBulkOutputWithTsvImporterTextMapper() throws Exception { - Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); - args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); - String data = "KEY\u001bVALUE4\u001bVALUE8\n"; - doMROnTableTest(data, 4); - util.deleteTable(tn); - } - - @Test - public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { - String[] args = new String[] { tn.getNameAsString(), "/inputFile" }; - - Configuration conf = new Configuration(util.getConfiguration()); - conf.set(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,FAM:A"); - conf.set(ImportTsv.BULK_OUTPUT_CONF_KEY, "/output"); - conf.set(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); - exception.expect(TableNotFoundException.class); - assertEquals("running test job configuration failed.", 0, - ToolRunner.run(new Configuration(util.getConfiguration()), new ImportTsv() { - @Override public int run(String[] args) throws Exception { - createSubmittableJob(getConf(), args); - return 0; - } - }, args)); - } - - @Test - public void testMRWithoutAnExistingTable() throws Exception { - String[] args = - new String[] { tn.getNameAsString(), "/inputFile" }; - - exception.expect(TableNotFoundException.class); - assertEquals("running test job configuration failed.", 0, ToolRunner.run( - new Configuration(util.getConfiguration()), - new ImportTsv() { - @Override - public int run(String[] args) throws Exception { - createSubmittableJob(getConf(), args); - return 0; - } - }, args)); - } - - @Test - public void testJobConfigurationsWithDryMode() throws Exception { - Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()),"hfiles"); - String INPUT_FILE = "InputFile1.csv"; - // Prepare the arguments required for the test. - String[] argsArray = new String[] { - "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", - "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", - "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + bulkOutputPath.toString(), - "-D" + ImportTsv.DRY_RUN_CONF_KEY + "=true", - tn.getNameAsString(), - INPUT_FILE }; - assertEquals("running test job configuration failed.", 0, ToolRunner.run( - new Configuration(util.getConfiguration()), - new ImportTsv() { - @Override - public int run(String[] args) throws Exception { - Job job = createSubmittableJob(getConf(), args); - assertTrue(job.getOutputFormatClass().equals(NullOutputFormat.class)); - return 0; - } - }, argsArray)); - // Delete table created by createSubmittableJob. - util.deleteTable(tn); - } - - @Test - public void testDryModeWithoutBulkOutputAndTableExists() throws Exception { - util.createTable(tn, FAMILY); - args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); - doMROnTableTest(null, 1); - // Dry mode should not delete an existing table. If it's not present, - // this will throw TableNotFoundException. - util.deleteTable(tn); - } - - /** - * If table is not present in non-bulk mode, dry run should fail just like - * normal mode. - */ - @Test - public void testDryModeWithoutBulkOutputAndTableDoesNotExists() throws Exception { - args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); - exception.expect(TableNotFoundException.class); - doMROnTableTest(null, 1); - } - - @Test public void testDryModeWithBulkOutputAndTableExists() throws Exception { - util.createTable(tn, FAMILY); - // Prepare the arguments required for the test. - Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); - doMROnTableTest(null, 1); - // Dry mode should not delete an existing table. If it's not present, - // this will throw TableNotFoundException. - util.deleteTable(tn); - } - - /** - * If table is not present in bulk mode and create.table is not set to yes, - * import should fail with TableNotFoundException. - */ - @Test - public void testDryModeWithBulkOutputAndTableDoesNotExistsCreateTableSetToNo() throws - Exception { - // Prepare the arguments required for the test. - Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); - args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "no"); - exception.expect(TableNotFoundException.class); - doMROnTableTest(null, 1); - } - - @Test - public void testDryModeWithBulkModeAndTableDoesNotExistsCreateTableSetToYes() throws Exception { - // Prepare the arguments required for the test. - Path hfiles = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString()); - args.put(ImportTsv.DRY_RUN_CONF_KEY, "true"); - args.put(ImportTsv.CREATE_TABLE_CONF_KEY, "yes"); - doMROnTableTest(null, 1); - // Verify temporary table was deleted. - exception.expect(TableNotFoundException.class); - util.deleteTable(tn); - } - - /** - * If there are invalid data rows as inputs, then only those rows should be ignored. - */ - @Test - public void testTsvImporterTextMapperWithInvalidData() throws Exception { - Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.MAPPER_CONF_KEY, "org.apache.hadoop.hbase.mapreduce.TsvImporterTextMapper"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); - args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); - args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); - // 3 Rows of data as input. 2 Rows are valid and 1 row is invalid as it doesn't have TS - String data = "KEY,1234,VALUE1,VALUE2\nKEY\nKEY,1235,VALUE1,VALUE2\n"; - doMROnTableTest(util, tn, FAMILY, data, args, 1, 4); - util.deleteTable(tn); - } - - @Test - public void testSkipEmptyColumns() throws Exception { - Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles"); - args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); - args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B"); - args.put(ImportTsv.SEPARATOR_CONF_KEY, ","); - args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true"); - // 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4 - String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n"; - doMROnTableTest(util, tn, FAMILY, data, args, 1, 3); - util.deleteTable(tn); - } - - private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception { - return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1); - } - - protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, - String family, String data, Map<String, String> args) throws Exception { - return doMROnTableTest(util, table, family, data, args, 1,-1); - } - - /** - * Run an ImportTsv job and perform basic validation on the results. - * Returns the ImportTsv <code>Tool</code> instance so that other tests can - * inspect it for further validation as necessary. This method is static to - * insure non-reliance on instance's util/conf facilities. - * @param args Any arguments to pass BEFORE inputFile path is appended. - * @return The Tool instance used to run the test. - */ - protected static Tool doMROnTableTest(HBaseTestingUtility util, TableName table, - String family, String data, Map<String, String> args, int valueMultiplier,int expectedKVCount) - throws Exception { - Configuration conf = new Configuration(util.getConfiguration()); - - // populate input file - FileSystem fs = FileSystem.get(conf); - Path inputPath = fs.makeQualified( - new Path(util.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat")); - FSDataOutputStream op = fs.create(inputPath, true); - if (data == null) { - data = "KEY\u001bVALUE1\u001bVALUE2\n"; - } - op.write(Bytes.toBytes(data)); - op.close(); - LOG.debug(String.format("Wrote test data to file: %s", inputPath)); - - if (conf.getBoolean(FORCE_COMBINER_CONF, true)) { - LOG.debug("Forcing combiner."); - conf.setInt("mapreduce.map.combine.minspills", 1); - } - - // Build args array. - String[] argsArray = new String[args.size() + 2]; - Iterator it = args.entrySet().iterator(); - int i = 0; - while (it.hasNext()) { - Map.Entry pair = (Map.Entry) it.next(); - argsArray[i] = "-D" + pair.getKey() + "=" + pair.getValue(); - i++; - } - argsArray[i] = table.getNameAsString(); - argsArray[i + 1] = inputPath.toString(); - - // run the import - Tool tool = new ImportTsv(); - LOG.debug("Running ImportTsv with arguments: " + argsArray); - assertEquals(0, ToolRunner.run(conf, tool, argsArray)); - - // Perform basic validation. If the input args did not include - // ImportTsv.BULK_OUTPUT_CONF_KEY then validate data in the table. - // Otherwise, validate presence of hfiles. - boolean isDryRun = args.containsKey(ImportTsv.DRY_RUN_CONF_KEY) && - "true".equalsIgnoreCase(args.get(ImportTsv.DRY_RUN_CONF_KEY)); - if (args.containsKey(ImportTsv.BULK_OUTPUT_CONF_KEY)) { - if (isDryRun) { - assertFalse(String.format("Dry run mode, %s should not have been created.", - ImportTsv.BULK_OUTPUT_CONF_KEY), - fs.exists(new Path(ImportTsv.BULK_OUTPUT_CONF_KEY))); - } else { - validateHFiles(fs, args.get(ImportTsv.BULK_OUTPUT_CONF_KEY), family,expectedKVCount); - } - } else { - validateTable(conf, table, family, valueMultiplier, isDryRun); - } - - if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) { - LOG.debug("Deleting test subdirectory"); - util.cleanupDataTestDirOnTestFS(table.getNameAsString()); - } - return tool; - } - - /** - * Confirm ImportTsv via data in online table. - */ - private static void validateTable(Configuration conf, TableName tableName, - String family, int valueMultiplier, boolean isDryRun) throws IOException { - - LOG.debug("Validating table."); - Connection connection = ConnectionFactory.createConnection(conf); - Table table = connection.getTable(tableName); - boolean verified = false; - long pause = conf.getLong("hbase.client.pause", 5 * 1000); - int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - for (int i = 0; i < numRetries; i++) { - try { - Scan scan = new Scan(); - // Scan entire family. - scan.addFamily(Bytes.toBytes(family)); - ResultScanner resScanner = table.getScanner(scan); - int numRows = 0; - for (Result res : resScanner) { - numRows++; - assertEquals(2, res.size()); - List<Cell> kvs = res.listCells(); - assertTrue(CellUtil.matchingRow(kvs.get(0), Bytes.toBytes("KEY"))); - assertTrue(CellUtil.matchingRow(kvs.get(1), Bytes.toBytes("KEY"))); - assertTrue(CellUtil.matchingValue(kvs.get(0), Bytes.toBytes("VALUE" + valueMultiplier))); - assertTrue(CellUtil.matchingValue(kvs.get(1), Bytes.toBytes("VALUE" + 2 * valueMultiplier))); - // Only one result set is expected, so let it loop. - } - if (isDryRun) { - assertEquals(0, numRows); - } else { - assertEquals(1, numRows); - } - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - table.close(); - connection.close(); - assertTrue(verified); - } - - /** - * Confirm ImportTsv via HFiles on fs. - */ - private static void validateHFiles(FileSystem fs, String outputPath, String family, - int expectedKVCount) throws IOException { - // validate number and content of output columns - LOG.debug("Validating HFiles."); - Set<String> configFamilies = new HashSet<>(); - configFamilies.add(family); - Set<String> foundFamilies = new HashSet<>(); - int actualKVCount = 0; - for (FileStatus cfStatus : fs.listStatus(new Path(outputPath), new OutputFilesFilter())) { - String[] elements = cfStatus.getPath().toString().split(Path.SEPARATOR); - String cf = elements[elements.length - 1]; - foundFamilies.add(cf); - assertTrue( - String.format( - "HFile output contains a column family (%s) not present in input families (%s)", - cf, configFamilies), - configFamilies.contains(cf)); - for (FileStatus hfile : fs.listStatus(cfStatus.getPath())) { - assertTrue( - String.format("HFile %s appears to contain no data.", hfile.getPath()), - hfile.getLen() > 0); - // count the number of KVs from all the hfiles - if (expectedKVCount > -1) { - actualKVCount += getKVCountFromHfile(fs, hfile.getPath()); - } - } - } - assertTrue(String.format("HFile output does not contain the input family '%s'.", family), - foundFamilies.contains(family)); - if (expectedKVCount > -1) { - assertTrue(String.format( - "KV count in ouput hfile=<%d> doesn't match with expected KV count=<%d>", actualKVCount, - expectedKVCount), actualKVCount == expectedKVCount); - } - } - - /** - * Method returns the total KVs in given hfile - * @param fs File System - * @param p HFile path - * @return KV count in the given hfile - * @throws IOException - */ - private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException { - Configuration conf = util.getConfiguration(); - HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf); - reader.loadFileInfo(); - HFileScanner scanner = reader.getScanner(false, false); - scanner.seekTo(); - int count = 0; - do { - count++; - } while (scanner.next()); - reader.close(); - return count; - } -} -
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java deleted file mode 100644 index 3c38102..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsvParser.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.ArrayList; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; - -/** - * Tests for {@link TsvParser}. - */ -@Category({MapReduceTests.class, SmallTests.class}) -public class TestImportTsvParser { - - private void assertBytesEquals(byte[] a, byte[] b) { - assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b)); - } - - private void checkParsing(ParsedLine parsed, Iterable<String> expected) { - ArrayList<String> parsedCols = new ArrayList<>(); - for (int i = 0; i < parsed.getColumnCount(); i++) { - parsedCols.add(Bytes.toString(parsed.getLineBytes(), parsed.getColumnOffset(i), - parsed.getColumnLength(i))); - } - if (!Iterables.elementsEqual(parsedCols, expected)) { - fail("Expected: " + Joiner.on(",").join(expected) + "\n" + "Got:" - + Joiner.on(",").join(parsedCols)); - } - } - - @Test - public void testTsvParserSpecParsing() { - TsvParser parser; - - parser = new TsvParser("HBASE_ROW_KEY", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertFalse(parser.hasTimestamp()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertFalse(parser.hasTimestamp()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); - assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertFalse(parser.hasTimestamp()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); - assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertTrue(parser.hasTimestamp()); - assertEquals(2, parser.getTimestampKeyColumnIndex()); - - parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ATTRIBUTES_KEY", - "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); - assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertTrue(parser.hasTimestamp()); - assertEquals(2, parser.getTimestampKeyColumnIndex()); - assertEquals(4, parser.getAttributesKeyColumnIndex()); - - parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2,HBASE_ROW_KEY", - "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); - assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); - assertEquals(4, parser.getRowKeyColumnIndex()); - assertTrue(parser.hasTimestamp()); - assertEquals(2, parser.getTimestampKeyColumnIndex()); - assertEquals(0, parser.getAttributesKeyColumnIndex()); - } - - @Test - public void testTsvParser() throws BadTsvLineException { - TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t"); - assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0)); - assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0)); - assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1)); - assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1)); - assertNull(parser.getFamily(2)); - assertNull(parser.getQualifier(2)); - assertEquals(2, parser.getRowKeyColumnIndex()); - - assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex()); - - byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); - ParsedLine parsed = parser.parse(line, line.length); - checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); - } - - @Test - public void testTsvParserWithTimestamp() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); - assertNull(parser.getFamily(0)); - assertNull(parser.getQualifier(0)); - assertNull(parser.getFamily(1)); - assertNull(parser.getQualifier(1)); - assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2)); - assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2)); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertEquals(1, parser.getTimestampKeyColumnIndex()); - - byte[] line = Bytes.toBytes("rowkey\t1234\tval_a"); - ParsedLine parsed = parser.parse(line, line.length); - assertEquals(1234l, parsed.getTimestamp(-1)); - checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); - } - - /** - * Test cases that throw BadTsvLineException - */ - @Test(expected = BadTsvLineException.class) - public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); - parser.parse(line, line.length); - } - - @Test(expected = BadTsvLineException.class) - public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes(""); - parser.parse(line, line.length); - } - - @Test(expected = BadTsvLineException.class) - public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); - byte[] line = Bytes.toBytes("key_only"); - parser.parse(line, line.length); - } - - @Test(expected = BadTsvLineException.class) - public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); - byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); - parser.parse(line, line.length); - } - - @Test(expected = BadTsvLineException.class) - public void testTsvParserInvalidTimestamp() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); - assertEquals(1, parser.getTimestampKeyColumnIndex()); - byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a"); - ParsedLine parsed = parser.parse(line, line.length); - assertEquals(-1, parsed.getTimestamp(-1)); - checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); - } - - @Test(expected = BadTsvLineException.class) - public void testTsvParserNoTimestampValue() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); - assertEquals(2, parser.getTimestampKeyColumnIndex()); - byte[] line = Bytes.toBytes("rowkey\tval_a"); - parser.parse(line, line.length); - } - - @Test - public void testTsvParserParseRowKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); - assertEquals(0, parser.getRowKeyColumnIndex()); - byte[] line = Bytes.toBytes("rowkey\tval_a\t1234"); - Pair<Integer, Integer> rowKeyOffsets = parser.parseRowKey(line, line.length); - assertEquals(0, rowKeyOffsets.getFirst().intValue()); - assertEquals(6, rowKeyOffsets.getSecond().intValue()); - try { - line = Bytes.toBytes("\t\tval_a\t1234"); - parser.parseRowKey(line, line.length); - fail("Should get BadTsvLineException on empty rowkey."); - } catch (BadTsvLineException b) { - - } - parser = new TsvParser("col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t"); - assertEquals(1, parser.getRowKeyColumnIndex()); - line = Bytes.toBytes("val_a\trowkey\t1234"); - rowKeyOffsets = parser.parseRowKey(line, line.length); - assertEquals(6, rowKeyOffsets.getFirst().intValue()); - assertEquals(6, rowKeyOffsets.getSecond().intValue()); - try { - line = Bytes.toBytes("val_a"); - rowKeyOffsets = parser.parseRowKey(line, line.length); - fail("Should get BadTsvLineException when number of columns less than rowkey position."); - } catch (BadTsvLineException b) { - - } - parser = new TsvParser("col_a,HBASE_TS_KEY,HBASE_ROW_KEY", "\t"); - assertEquals(2, parser.getRowKeyColumnIndex()); - line = Bytes.toBytes("val_a\t1234\trowkey"); - rowKeyOffsets = parser.parseRowKey(line, line.length); - assertEquals(11, rowKeyOffsets.getFirst().intValue()); - assertEquals(6, rowKeyOffsets.getSecond().intValue()); - } - - @Test - public void testTsvParseAttributesKey() throws BadTsvLineException { - TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY", "\t"); - assertEquals(0, parser.getRowKeyColumnIndex()); - byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value"); - ParsedLine parse = parser.parse(line, line.length); - assertEquals(18, parse.getAttributeKeyOffset()); - assertEquals(3, parser.getAttributesKeyColumnIndex()); - String attributes[] = parse.getIndividualAttributes(); - assertEquals(attributes[0], "key=>value"); - try { - line = Bytes.toBytes("rowkey\tval_a\t1234"); - parser.parse(line, line.length); - fail("Should get BadTsvLineException on empty rowkey."); - } catch (BadTsvLineException b) { - - } - parser = new TsvParser("HBASE_ATTRIBUTES_KEY,col_a,HBASE_ROW_KEY,HBASE_TS_KEY", "\t"); - assertEquals(2, parser.getRowKeyColumnIndex()); - line = Bytes.toBytes("key=>value\tval_a\trowkey\t1234"); - parse = parser.parse(line, line.length); - assertEquals(0, parse.getAttributeKeyOffset()); - assertEquals(0, parser.getAttributesKeyColumnIndex()); - attributes = parse.getIndividualAttributes(); - assertEquals(attributes[0], "key=>value"); - try { - line = Bytes.toBytes("val_a"); - ParsedLine parse2 = parser.parse(line, line.length); - fail("Should get BadTsvLineException when number of columns less than rowkey position."); - } catch (BadTsvLineException b) { - - } - parser = new TsvParser("col_a,HBASE_ATTRIBUTES_KEY,HBASE_TS_KEY,HBASE_ROW_KEY", "\t"); - assertEquals(3, parser.getRowKeyColumnIndex()); - line = Bytes.toBytes("val_a\tkey0=>value0,key1=>value1,key2=>value2\t1234\trowkey"); - parse = parser.parse(line, line.length); - assertEquals(1, parser.getAttributesKeyColumnIndex()); - assertEquals(6, parse.getAttributeKeyOffset()); - String[] attr = parse.getIndividualAttributes(); - int i = 0; - for(String str : attr) { - assertEquals(("key"+i+"=>"+"value"+i), str ); - i++; - } - } - - @Test - public void testTsvParserWithCellVisibilityCol() throws BadTsvLineException { - TsvParser parser = new TsvParser( - "HBASE_ROW_KEY,col_a,HBASE_TS_KEY,HBASE_ATTRIBUTES_KEY,HBASE_CELL_VISIBILITY", "\t"); - assertEquals(0, parser.getRowKeyColumnIndex()); - assertEquals(4, parser.getCellVisibilityColumnIndex()); - byte[] line = Bytes.toBytes("rowkey\tval_a\t1234\tkey=>value\tPRIVATE&SECRET"); - ParsedLine parse = parser.parse(line, line.length); - assertEquals(18, parse.getAttributeKeyOffset()); - assertEquals(3, parser.getAttributesKeyColumnIndex()); - String attributes[] = parse.getIndividualAttributes(); - assertEquals(attributes[0], "key=>value"); - assertEquals(29, parse.getCellVisibilityColumnOffset()); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java deleted file mode 100644 index 8187b73..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestJarFinder.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.Writer; -import java.text.MessageFormat; -import java.util.Properties; -import java.util.jar.JarInputStream; -import java.util.jar.JarOutputStream; -import java.util.jar.Manifest; - -/** - * This file was forked from hadoop/common/branches/branch-2@1350012. - */ -@Category(SmallTests.class) -public class TestJarFinder { - - @Test - public void testJar() throws Exception { - - //picking a class that is for sure in a JAR in the classpath - String jar = JarFinder.getJar(LogFactory.class); - Assert.assertTrue(new File(jar).exists()); - } - - private static void delete(File file) throws IOException { - if (file.getAbsolutePath().length() < 5) { - throw new IllegalArgumentException( - MessageFormat.format("Path [{0}] is too short, not deleting", - file.getAbsolutePath())); - } - if (file.exists()) { - if (file.isDirectory()) { - File[] children = file.listFiles(); - if (children != null) { - for (File child : children) { - delete(child); - } - } - } - if (!file.delete()) { - throw new RuntimeException( - MessageFormat.format("Could not delete path [{0}]", - file.getAbsolutePath())); - } - } - } - - @Test - public void testExpandedClasspath() throws Exception { - //picking a class that is for sure in a directory in the classpath - //in this case the JAR is created on the fly - String jar = JarFinder.getJar(TestJarFinder.class); - Assert.assertTrue(new File(jar).exists()); - } - - @Test - public void testExistingManifest() throws Exception { - File dir = new File(System.getProperty("test.build.dir", "target/test-dir"), - TestJarFinder.class.getName() + "-testExistingManifest"); - delete(dir); - dir.mkdirs(); - - File metaInfDir = new File(dir, "META-INF"); - metaInfDir.mkdirs(); - File manifestFile = new File(metaInfDir, "MANIFEST.MF"); - Manifest manifest = new Manifest(); - OutputStream os = new FileOutputStream(manifestFile); - manifest.write(os); - os.close(); - - File propsFile = new File(dir, "props.properties"); - Writer writer = new FileWriter(propsFile); - new Properties().store(writer, ""); - writer.close(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - JarOutputStream zos = new JarOutputStream(baos); - JarFinder.jarDir(dir, "", zos); - JarInputStream jis = - new JarInputStream(new ByteArrayInputStream(baos.toByteArray())); - Assert.assertNotNull(jis.getManifest()); - jis.close(); - } - - @Test - public void testNoManifest() throws Exception { - File dir = new File(System.getProperty("test.build.dir", "target/test-dir"), - TestJarFinder.class.getName() + "-testNoManifest"); - delete(dir); - dir.mkdirs(); - File propsFile = new File(dir, "props.properties"); - Writer writer = new FileWriter(propsFile); - new Properties().store(writer, ""); - writer.close(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - JarOutputStream zos = new JarOutputStream(baos); - JarFinder.jarDir(dir, "", zos); - JarInputStream jis = - new JarInputStream(new ByteArrayInputStream(baos.toByteArray())); - Assert.assertNotNull(jis.getManifest()); - jis.close(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index b6ad2c9..b5b7a0c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java deleted file mode 100644 index 529a448..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ /dev/null @@ -1,669 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ClientServiceCallable; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Pair; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.Mockito; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap; - -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; - -/** - * Test cases for the atomic load error handling of the bulk load functionality. - */ -@Category({MapReduceTests.class, LargeTests.class}) -public class TestLoadIncrementalHFilesSplitRecovery { - private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); - - static HBaseTestingUtility util; - //used by secure subclass - static boolean useSecure = false; - - final static int NUM_CFS = 10; - final static byte[] QUAL = Bytes.toBytes("qual"); - final static int ROWCOUNT = 100; - - private final static byte[][] families = new byte[NUM_CFS][]; - - @Rule - public TestName name = new TestName(); - - static { - for (int i = 0; i < NUM_CFS; i++) { - families[i] = Bytes.toBytes(family(i)); - } - } - - static byte[] rowkey(int i) { - return Bytes.toBytes(String.format("row_%08d", i)); - } - - static String family(int i) { - return String.format("family_%04d", i); - } - - static byte[] value(int i) { - return Bytes.toBytes(String.format("%010d", i)); - } - - public static void buildHFiles(FileSystem fs, Path dir, int value) - throws IOException { - byte[] val = value(value); - for (int i = 0; i < NUM_CFS; i++) { - Path testIn = new Path(dir, family(i)); - - TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), - Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); - } - } - - /** - * Creates a table with given table name and specified number of column - * families if the table does not already exist. - */ - private void setupTable(final Connection connection, TableName table, int cfs) - throws IOException { - try { - LOG.info("Creating table " + table); - HTableDescriptor htd = new HTableDescriptor(table); - for (int i = 0; i < cfs; i++) { - htd.addFamily(new HColumnDescriptor(family(i))); - } - try (Admin admin = connection.getAdmin()) { - admin.createTable(htd); - } - } catch (TableExistsException tee) { - LOG.info("Table " + table + " already exists"); - } - } - - /** - * Creates a table with given table name,specified number of column families<br> - * and splitkeys if the table does not already exist. - * @param table - * @param cfs - * @param SPLIT_KEYS - */ - private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) - throws IOException { - try { - LOG.info("Creating table " + table); - HTableDescriptor htd = new HTableDescriptor(table); - for (int i = 0; i < cfs; i++) { - htd.addFamily(new HColumnDescriptor(family(i))); - } - - util.createTable(htd, SPLIT_KEYS); - } catch (TableExistsException tee) { - LOG.info("Table " + table + " already exists"); - } - } - - private Path buildBulkFiles(TableName table, int value) throws Exception { - Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); - Path bulk1 = new Path(dir, table.getNameAsString() + value); - FileSystem fs = util.getTestFileSystem(); - buildHFiles(fs, bulk1, value); - return bulk1; - } - - /** - * Populate table with known values. - */ - private void populateTable(final Connection connection, TableName table, int value) - throws Exception { - // create HFiles for different column families - LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); - Path bulk1 = buildBulkFiles(table, value); - try (Table t = connection.getTable(table); - RegionLocator locator = connection.getRegionLocator(table); - Admin admin = connection.getAdmin()) { - lih.doBulkLoad(bulk1, admin, t, locator); - } - } - - /** - * Split the known table in half. (this is hard coded for this test suite) - */ - private void forceSplit(TableName table) { - try { - // need to call regions server to by synchronous but isn't visible. - HRegionServer hrs = util.getRSForFirstRegionInTable(table); - - for (HRegionInfo hri : - ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { - if (hri.getTable().equals(table)) { - util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); - //ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); - } - } - - // verify that split completed. - int regions; - do { - regions = 0; - for (HRegionInfo hri : - ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { - if (hri.getTable().equals(table)) { - regions++; - } - } - if (regions != 2) { - LOG.info("Taking some time to complete split..."); - Thread.sleep(250); - } - } while (regions != 2); - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @BeforeClass - public static void setupCluster() throws Exception { - util = new HBaseTestingUtility(); - util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); - util.startMiniCluster(1); - } - - @AfterClass - public static void teardownCluster() throws Exception { - util.shutdownMiniCluster(); - } - - /** - * Checks that all columns have the expected value and that there is the - * expected number of rows. - * @throws IOException - */ - void assertExpectedTable(TableName table, int count, int value) throws IOException { - HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString()); - assertEquals(htds.length, 1); - Table t = null; - try { - t = util.getConnection().getTable(table); - Scan s = new Scan(); - ResultScanner sr = t.getScanner(s); - int i = 0; - for (Result r : sr) { - i++; - for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) { - for (byte[] val : nm.values()) { - assertTrue(Bytes.equals(val, value(value))); - } - } - } - assertEquals(count, i); - } catch (IOException e) { - fail("Failed due to exception"); - } finally { - if (t != null) t.close(); - } - } - - /** - * Test that shows that exception thrown from the RS side will result in an - * exception on the LIHFile client. - */ - @Test(expected=IOException.class, timeout=120000) - public void testBulkLoadPhaseFailure() throws Exception { - final TableName table = TableName.valueOf(name.getMethodName()); - final AtomicInteger attmptedCalls = new AtomicInteger(); - final AtomicInteger failedCalls = new AtomicInteger(); - util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); - try (Connection connection = ConnectionFactory.createConnection(util - .getConfiguration())) { - setupTable(connection, table, 10); - LoadIncrementalHFiles lih = new LoadIncrementalHFiles( - util.getConfiguration()) { - @Override - protected List<LoadQueueItem> tryAtomicRegionLoad( - ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first, - Collection<LoadQueueItem> lqis) throws IOException { - int i = attmptedCalls.incrementAndGet(); - if (i == 1) { - Connection errConn; - try { - errConn = getMockedConnection(util.getConfiguration()); - serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true); - } catch (Exception e) { - LOG.fatal("mocking cruft, should never happen", e); - throw new RuntimeException("mocking cruft, should never happen"); - } - failedCalls.incrementAndGet(); - return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); - } - - return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); - } - }; - try { - // create HFiles for different column families - Path dir = buildBulkFiles(table, 1); - try (Table t = connection.getTable(table); - RegionLocator locator = connection.getRegionLocator(table); - Admin admin = connection.getAdmin()) { - lih.doBulkLoad(dir, admin, t, locator); - } - } finally { - util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - } - fail("doBulkLoad should have thrown an exception"); - } - } - - /** - * Test that shows that exception thrown from the RS side will result in the - * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} - * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set - */ - @Test - public void testRetryOnIOException() throws Exception { - final TableName table = TableName.valueOf(name.getMethodName()); - final AtomicInteger calls = new AtomicInteger(1); - final Connection conn = ConnectionFactory.createConnection(util - .getConfiguration()); - util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); - util.getConfiguration().setBoolean( - LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); - final LoadIncrementalHFiles lih = new LoadIncrementalHFiles( - util.getConfiguration()) { - @Override - protected List<LoadQueueItem> tryAtomicRegionLoad( - ClientServiceCallable<byte[]> serverCallable, TableName tableName, - final byte[] first, Collection<LoadQueueItem> lqis) - throws IOException { - if (calls.getAndIncrement() < util.getConfiguration().getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) { - ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>( - conn, tableName, first, new RpcControllerFactory( - util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { - @Override - public byte[] rpcCall() throws Exception { - throw new IOException("Error calling something on RegionServer"); - } - }; - return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); - } else { - return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); - } - } - }; - setupTable(conn, table, 10); - Path dir = buildBulkFiles(table, 1); - lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), - conn.getRegionLocator(table)); - util.getConfiguration().setBoolean( - LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); - - } - - @SuppressWarnings("deprecation") - private ClusterConnection getMockedConnection(final Configuration conf) - throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException { - ClusterConnection c = Mockito.mock(ClusterConnection.class); - Mockito.when(c.getConfiguration()).thenReturn(conf); - Mockito.doNothing().when(c).close(); - // Make it so we return a particular location when asked. - final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, - ServerName.valueOf("example.org", 1234, 0)); - Mockito.when(c.getRegionLocation((TableName) Mockito.any(), - (byte[]) Mockito.any(), Mockito.anyBoolean())). - thenReturn(loc); - Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())). - thenReturn(loc); - ClientProtos.ClientService.BlockingInterface hri = - Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); - Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())). - thenThrow(new ServiceException(new IOException("injecting bulk load error"))); - Mockito.when(c.getClient(Mockito.any(ServerName.class))). - thenReturn(hri); - return c; - } - - /** - * This test exercises the path where there is a split after initial - * validation but before the atomic bulk load call. We cannot use presplitting - * to test this path, so we actually inject a split just before the atomic - * region load. - */ - @Test (timeout=120000) - public void testSplitWhileBulkLoadPhase() throws Exception { - final TableName table = TableName.valueOf(name.getMethodName()); - try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { - setupTable(connection, table, 10); - populateTable(connection, table,1); - assertExpectedTable(table, ROWCOUNT, 1); - - // Now let's cause trouble. This will occur after checks and cause bulk - // files to fail when attempt to atomically import. This is recoverable. - final AtomicInteger attemptedCalls = new AtomicInteger(); - LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { - @Override - protected void bulkLoadPhase(final Table htable, final Connection conn, - ExecutorService pool, Deque<LoadQueueItem> queue, - final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, - Map<LoadQueueItem, ByteBuffer> item2RegionMap) - throws IOException { - int i = attemptedCalls.incrementAndGet(); - if (i == 1) { - // On first attempt force a split. - forceSplit(table); - } - super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); - } - }; - - // create HFiles for different column families - try (Table t = connection.getTable(table); - RegionLocator locator = connection.getRegionLocator(table); - Admin admin = connection.getAdmin()) { - Path bulk = buildBulkFiles(table, 2); - lih2.doBulkLoad(bulk, admin, t, locator); - } - - // check that data was loaded - // The three expected attempts are 1) failure because need to split, 2) - // load of split top 3) load of split bottom - assertEquals(attemptedCalls.get(), 3); - assertExpectedTable(table, ROWCOUNT, 2); - } - } - - /** - * This test splits a table and attempts to bulk load. The bulk import files - * should be split before atomically importing. - */ - @Test (timeout=120000) - public void testGroupOrSplitPresplit() throws Exception { - final TableName table = TableName.valueOf(name.getMethodName()); - try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { - setupTable(connection, table, 10); - populateTable(connection, table, 1); - assertExpectedTable(connection, table, ROWCOUNT, 1); - forceSplit(table); - - final AtomicInteger countedLqis= new AtomicInteger(); - LoadIncrementalHFiles lih = new LoadIncrementalHFiles( - util.getConfiguration()) { - @Override - protected Pair<List<LoadQueueItem>, String> groupOrSplit( - Multimap<ByteBuffer, LoadQueueItem> regionGroups, - final LoadQueueItem item, final Table htable, - final Pair<byte[][], byte[][]> startEndKeys) throws IOException { - Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable, - startEndKeys); - if (lqis != null && lqis.getFirst() != null) { - countedLqis.addAndGet(lqis.getFirst().size()); - } - return lqis; - } - }; - - // create HFiles for different column families - Path bulk = buildBulkFiles(table, 2); - try (Table t = connection.getTable(table); - RegionLocator locator = connection.getRegionLocator(table); - Admin admin = connection.getAdmin()) { - lih.doBulkLoad(bulk, admin, t, locator); - } - assertExpectedTable(connection, table, ROWCOUNT, 2); - assertEquals(20, countedLqis.get()); - } - } - - /** - * This test creates a table with many small regions. The bulk load files - * would be splitted multiple times before all of them can be loaded successfully. - */ - @Test (timeout=120000) - public void testSplitTmpFileCleanUp() throws Exception { - final TableName table = TableName.valueOf(name.getMethodName()); - byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), - Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), - Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")}; - try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { - setupTableWithSplitkeys(table, 10, SPLIT_KEYS); - - LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); - - // create HFiles - Path bulk = buildBulkFiles(table, 2); - try (Table t = connection.getTable(table); - RegionLocator locator = connection.getRegionLocator(table); - Admin admin = connection.getAdmin()) { - lih.doBulkLoad(bulk, admin, t, locator); - } - // family path - Path tmpPath = new Path(bulk, family(0)); - // TMP_DIR under family path - tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); - FileSystem fs = bulk.getFileSystem(util.getConfiguration()); - // HFiles have been splitted, there is TMP_DIR - assertTrue(fs.exists(tmpPath)); - // TMP_DIR should have been cleaned-up - assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", - FSUtils.listStatus(fs, tmpPath)); - assertExpectedTable(connection, table, ROWCOUNT, 2); - } - } - - /** - * This simulates an remote exception which should cause LIHF to exit with an - * exception. - */ - @Test(expected = IOException.class, timeout=120000) - public void testGroupOrSplitFailure() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { - setupTable(connection, tableName, 10); - - LoadIncrementalHFiles lih = new LoadIncrementalHFiles( - util.getConfiguration()) { - int i = 0; - - @Override - protected Pair<List<LoadQueueItem>, String> groupOrSplit( - Multimap<ByteBuffer, LoadQueueItem> regionGroups, - final LoadQueueItem item, final Table table, - final Pair<byte[][], byte[][]> startEndKeys) throws IOException { - i++; - - if (i == 5) { - throw new IOException("failure"); - } - return super.groupOrSplit(regionGroups, item, table, startEndKeys); - } - }; - - // create HFiles for different column families - Path dir = buildBulkFiles(tableName,1); - try (Table t = connection.getTable(tableName); - RegionLocator locator = connection.getRegionLocator(tableName); - Admin admin = connection.getAdmin()) { - lih.doBulkLoad(dir, admin, t, locator); - } - } - - fail("doBulkLoad should have thrown an exception"); - } - - @Test (timeout=120000) - public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); - byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; - // Share connection. We were failing to find the table with our new reverse scan because it - // looks for first region, not any region -- that is how it works now. The below removes first - // region in test. Was reliant on the Connection caching having first region. - Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); - Table table = connection.getTable(tableName); - - setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); - Path dir = buildBulkFiles(tableName, 2); - - final AtomicInteger countedLqis = new AtomicInteger(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { - - @Override - protected Pair<List<LoadQueueItem>, String> groupOrSplit( - Multimap<ByteBuffer, LoadQueueItem> regionGroups, - final LoadQueueItem item, final Table htable, - final Pair<byte[][], byte[][]> startEndKeys) throws IOException { - Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable, - startEndKeys); - if (lqis != null && lqis.getFirst() != null) { - countedLqis.addAndGet(lqis.getFirst().size()); - } - return lqis; - } - }; - - // do bulkload when there is no region hole in hbase:meta. - try (Table t = connection.getTable(tableName); - RegionLocator locator = connection.getRegionLocator(tableName); - Admin admin = connection.getAdmin()) { - loader.doBulkLoad(dir, admin, t, locator); - } catch (Exception e) { - LOG.error("exeception=", e); - } - // check if all the data are loaded into the table. - this.assertExpectedTable(tableName, ROWCOUNT, 2); - - dir = buildBulkFiles(tableName, 3); - - // Mess it up by leaving a hole in the hbase:meta - List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); - for (HRegionInfo regionInfo : regionInfos) { - if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { - MetaTableAccessor.deleteRegion(connection, regionInfo); - break; - } - } - - try (Table t = connection.getTable(tableName); - RegionLocator locator = connection.getRegionLocator(tableName); - Admin admin = connection.getAdmin()) { - loader.doBulkLoad(dir, admin, t, locator); - } catch (Exception e) { - LOG.error("exception=", e); - assertTrue("IOException expected", e instanceof IOException); - } - - table.close(); - - // Make sure at least the one region that still exists can be found. - regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); - assertTrue(regionInfos.size() >= 1); - - this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); - connection.close(); - } - - /** - * Checks that all columns have the expected value and that there is the - * expected number of rows. - * @throws IOException - */ - void assertExpectedTable(final Connection connection, TableName table, int count, int value) - throws IOException { - HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString()); - assertEquals(htds.length, 1); - Table t = null; - try { - t = connection.getTable(table); - Scan s = new Scan(); - ResultScanner sr = t.getScanner(s); - int i = 0; - for (Result r : sr) { - i++; - for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) { - for (byte[] val : nm.values()) { - assertTrue(Bytes.equals(val, value(value))); - } - } - } - assertEquals(count, i); - } catch (IOException e) { - fail("Failed due to exception"); - } finally { - if (t != null) t.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java deleted file mode 100644 index 0c5207b..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormat.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; -import org.apache.hadoop.mapreduce.Job; -import org.junit.BeforeClass; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.List; - -/** - * Tests various scan start and stop row scenarios. This is set in a scan and - * tested in a MapReduce job to see if that is handed over and done properly - * too. - */ -@Category({VerySlowMapReduceTests.class, LargeTests.class}) -public class TestMultiTableInputFormat extends MultiTableInputFormatTestBase { - - @BeforeClass - public static void setupLogging() { - TEST_UTIL.enableDebug(MultiTableInputFormat.class); - } - - @Override - protected void initJob(List<Scan> scans, Job job) throws IOException { - TableMapReduceUtil.initTableMapperJob(scans, ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java deleted file mode 100644 index 32f511b..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormat.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Function; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps; -import edu.umd.cs.findbugs.annotations.Nullable; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.mapreduce.Job; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.experimental.categories.Category; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -@Category({ VerySlowMapReduceTests.class, LargeTests.class }) -public class TestMultiTableSnapshotInputFormat extends MultiTableInputFormatTestBase { - - protected Path restoreDir; - - @BeforeClass - public static void setUpSnapshots() throws Exception { - - TEST_UTIL.enableDebug(MultiTableSnapshotInputFormat.class); - TEST_UTIL.enableDebug(MultiTableSnapshotInputFormatImpl.class); - - // take a snapshot of every table we have. - for (String tableName : TABLES) { - SnapshotTestingUtils - .createSnapshotAndValidate(TEST_UTIL.getAdmin(), TableName.valueOf(tableName), - ImmutableList.of(MultiTableInputFormatTestBase.INPUT_FAMILY), null, - snapshotNameForTable(tableName), FSUtils.getRootDir(TEST_UTIL.getConfiguration()), - TEST_UTIL.getTestFileSystem(), true); - } - } - - @Before - public void setUp() throws Exception { - this.restoreDir = TEST_UTIL.getRandomDir(); - } - - @Override - protected void initJob(List<Scan> scans, Job job) throws IOException { - TableMapReduceUtil - .initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); - } - - protected Map<String, Collection<Scan>> getSnapshotScanMapping(final List<Scan> scans) { - return Multimaps.index(scans, new Function<Scan, String>() { - @Nullable - @Override - public String apply(Scan input) { - return snapshotNameForTable( - Bytes.toStringBinary(input.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME))); - } - }).asMap(); - } - - public static String snapshotNameForTable(String tableName) { - return tableName + "_snapshot"; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java deleted file mode 100644 index 1c33848..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableSnapshotInputFormatImpl.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.verify; - -@Category({ SmallTests.class }) -public class TestMultiTableSnapshotInputFormatImpl { - - private MultiTableSnapshotInputFormatImpl subject; - private Map<String, Collection<Scan>> snapshotScans; - private Path restoreDir; - private Configuration conf; - private Path rootDir; - - @Before - public void setUp() throws Exception { - this.subject = Mockito.spy(new MultiTableSnapshotInputFormatImpl()); - - // mock out restoreSnapshot - // TODO: this is kind of meh; it'd be much nicer to just inject the RestoreSnapshotHelper - // dependency into the - // input format. However, we need a new RestoreSnapshotHelper per snapshot in the current - // design, and it *also* - // feels weird to introduce a RestoreSnapshotHelperFactory and inject that, which would - // probably be the more "pure" - // way of doing things. This is the lesser of two evils, perhaps? - doNothing().when(this.subject). - restoreSnapshot(any(Configuration.class), any(String.class), any(Path.class), - any(Path.class), any(FileSystem.class)); - - this.conf = new Configuration(); - this.rootDir = new Path("file:///test-root-dir"); - FSUtils.setRootDir(conf, rootDir); - this.snapshotScans = ImmutableMap.<String, Collection<Scan>>of("snapshot1", - ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2"))), "snapshot2", - ImmutableList.of(new Scan(Bytes.toBytes("3"), Bytes.toBytes("4")), - new Scan(Bytes.toBytes("5"), Bytes.toBytes("6")))); - - this.restoreDir = new Path(FSUtils.getRootDir(conf), "restore-dir"); - - } - - public void callSetInput() throws IOException { - subject.setInput(this.conf, snapshotScans, restoreDir); - } - - public Map<String, Collection<ScanWithEquals>> toScanWithEquals( - Map<String, Collection<Scan>> snapshotScans) throws IOException { - Map<String, Collection<ScanWithEquals>> rtn = Maps.newHashMap(); - - for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) { - List<ScanWithEquals> scans = Lists.newArrayList(); - - for (Scan scan : entry.getValue()) { - scans.add(new ScanWithEquals(scan)); - } - rtn.put(entry.getKey(), scans); - } - - return rtn; - } - - public static class ScanWithEquals { - - private final String startRow; - private final String stopRow; - - /** - * Creates a new instance of this class while copying all values. - * - * @param scan The scan instance to copy from. - * @throws java.io.IOException When copying the values fails. - */ - public ScanWithEquals(Scan scan) throws IOException { - this.startRow = Bytes.toStringBinary(scan.getStartRow()); - this.stopRow = Bytes.toStringBinary(scan.getStopRow()); - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof ScanWithEquals)) { - return false; - } - ScanWithEquals otherScan = (ScanWithEquals) obj; - return Objects.equals(this.startRow, otherScan.startRow) && Objects - .equals(this.stopRow, otherScan.stopRow); - } - - @Override - public String toString() { - return org.apache.hadoop.hbase.shaded.com.google.common.base.MoreObjects. - toStringHelper(this).add("startRow", startRow) - .add("stopRow", stopRow).toString(); - } - } - - @Test - public void testSetInputSetsSnapshotToScans() throws Exception { - - callSetInput(); - - Map<String, Collection<Scan>> actual = subject.getSnapshotsToScans(conf); - - // convert to scans we can use .equals on - Map<String, Collection<ScanWithEquals>> actualWithEquals = toScanWithEquals(actual); - Map<String, Collection<ScanWithEquals>> expectedWithEquals = toScanWithEquals(snapshotScans); - - assertEquals(expectedWithEquals, actualWithEquals); - } - - @Test - public void testSetInputPushesRestoreDirectories() throws Exception { - callSetInput(); - - Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf); - - assertEquals(this.snapshotScans.keySet(), restoreDirs.keySet()); - } - - @Test - public void testSetInputCreatesRestoreDirectoriesUnderRootRestoreDir() throws Exception { - callSetInput(); - - Map<String, Path> restoreDirs = subject.getSnapshotDirs(conf); - - for (Path snapshotDir : restoreDirs.values()) { - assertEquals("Expected " + snapshotDir + " to be a child of " + restoreDir, restoreDir, - snapshotDir.getParent()); - } - } - - @Test - public void testSetInputRestoresSnapshots() throws Exception { - callSetInput(); - - Map<String, Path> snapshotDirs = subject.getSnapshotDirs(conf); - - for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) { - verify(this.subject).restoreSnapshot(eq(this.conf), eq(entry.getKey()), eq(this.rootDir), - eq(entry.getValue()), any(FileSystem.class)); - } - } -}