http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java new file mode 100644 index 0000000..694a359 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of + * a particular cell, and write it back to the table. + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestMultithreadedTableMapper { + private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class); + private static final HBaseTestingUtility UTIL = + new HBaseTestingUtility(); + static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); + static final int NUMBER_OF_THREADS = 10; + + @BeforeClass + public static void beforeClass() throws Exception { + // Up the handlers; this test needs more than usual. + UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); + UTIL.startMiniCluster(); + Table table = + UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, + OUTPUT_FAMILY }); + UTIL.loadTable(table, INPUT_FAMILY, false); + UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + /** + * Pass the given key and processed record reduce + */ + public static class ProcessContentsMapper + extends TableMapper<ImmutableBytesWritable, Put> { + + /** + * Pass the key, and reversed value to reduce + * + * @param key + * @param value + * @param context + * @throws IOException + */ + @Override + public void map(ImmutableBytesWritable key, Result value, + Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + // Get the original value and reverse it + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); + StringBuilder newValue = new StringBuilder(originalValue); + newValue.reverse(); + // Now set the value to be collected + Put outval = new Put(key.get()); + outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); + context.write(key, outval); + } + } + + /** + * Test multithreadedTableMappper map/reduce against a multi-region table + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testMultithreadedTableMapper() + throws IOException, InterruptedException, ClassNotFoundException { + runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); + } + + private void runTestOnTable(Table table) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = null; + try { + LOG.info("Before map/reduce startup"); + job = new Job(table.getConfiguration(), "process column contents"); + job.setNumReduceTasks(1); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + TableMapReduceUtil.initTableMapperJob( + table.getName(), scan, + MultithreadedTableMapper.class, ImmutableBytesWritable.class, + Put.class, job); + MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); + MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); + TableMapReduceUtil.initTableReducerJob( + table.getName().getNameAsString(), + IdentityTableReducer.class, job); + FileOutputFormat.setOutputPath(job, new Path("test")); + LOG.info("Started " + table.getName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion"); + // verify map-reduce results + verify(table.getName()); + } finally { + table.close(); + if (job != null) { + FileUtil.fullyDelete( + new File(job.getConfiguration().get("hadoop.tmp.dir"))); + } + } + } + + private void verify(TableName tableName) throws IOException { + Table table = UTIL.getConnection().getTable(tableName); + boolean verified = false; + long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); + int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + LOG.info("Verification attempt #" + i); + verifyAttempt(table); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + LOG.debug("Verification attempt failed: " + e.getMessage()); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + table.close(); + } + + /** + * Looks at every value of the mapreduce output and verifies that indeed + * the values have been reversed. + * + * @param table Table to scan. + * @throws IOException + * @throws NullPointerException if we failed to find a cell value + */ + private void verifyAttempt(final Table table) + throws IOException, NullPointerException { + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + scan.addFamily(OUTPUT_FAMILY); + ResultScanner scanner = table.getScanner(scan); + try { + Iterator<Result> itr = scanner.iterator(); + assertTrue(itr.hasNext()); + while(itr.hasNext()) { + Result r = itr.next(); + if (LOG.isDebugEnabled()) { + if (r.size() > 2 ) { + throw new IOException("Too many results, expected 2 got " + + r.size()); + } + } + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + for(Cell kv : r.listCells()) { + if (count == 0) { + firstValue = CellUtil.cloneValue(kv); + }else if (count == 1) { + secondValue = CellUtil.cloneValue(kv); + }else if (count == 2) { + break; + } + count++; + } + String first = ""; + if (firstValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": first value is null"); + } + first = Bytes.toString(firstValue); + String second = ""; + if (secondValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": second value is null"); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + second = Bytes.toString(secondReversed); + if (first.compareTo(second) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("second key is not the reverse of first. row=" + + Bytes.toStringBinary(r.getRow()) + ", first value=" + first + + ", second value=" + second); + } + fail(); + } + } + } finally { + scanner.close(); + } + } + +} +
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java new file mode 100644 index 0000000..301cfef --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_REGIONSERVER_PORT; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +@Category({MiscTests.class, SmallTests.class}) +public class TestRegionSizeCalculator { + + private Configuration configuration = new Configuration(); + private final long megabyte = 1024L * 1024L; + private final ServerName sn = ServerName.valueOf("local-rs", DEFAULT_REGIONSERVER_PORT, + ServerName.NON_STARTCODE); + + @Test + public void testSimpleTestCase() throws Exception { + + RegionLocator regionLocator = mockRegionLocator("region1", "region2", "region3"); + + Admin admin = mockAdmin( + mockRegion("region1", 123), + mockRegion("region3", 1232), + mockRegion("region2", 54321) + ); + + RegionSizeCalculator calculator = new RegionSizeCalculator(regionLocator, admin); + + assertEquals(123 * megabyte, calculator.getRegionSize("region1".getBytes())); + assertEquals(54321 * megabyte, calculator.getRegionSize("region2".getBytes())); + assertEquals(1232 * megabyte, calculator.getRegionSize("region3".getBytes())); + // if regionCalculator does not know about a region, it should return 0 + assertEquals(0 * megabyte, calculator.getRegionSize("otherTableRegion".getBytes())); + + assertEquals(3, calculator.getRegionSizeMap().size()); + } + + + /** + * When size of region in megabytes is larger than largest possible integer there could be + * error caused by lost of precision. + * */ + @Test + public void testLargeRegion() throws Exception { + + RegionLocator regionLocator = mockRegionLocator("largeRegion"); + + Admin admin = mockAdmin( + mockRegion("largeRegion", Integer.MAX_VALUE) + ); + + RegionSizeCalculator calculator = new RegionSizeCalculator(regionLocator, admin); + + assertEquals(((long) Integer.MAX_VALUE) * megabyte, calculator.getRegionSize("largeRegion".getBytes())); + } + + /** When calculator is disabled, it should return 0 for each request.*/ + @Test + public void testDisabled() throws Exception { + String regionName = "cz.goout:/index.html"; + RegionLocator table = mockRegionLocator(regionName); + + Admin admin = mockAdmin( + mockRegion(regionName, 999) + ); + + //first request on enabled calculator + RegionSizeCalculator calculator = new RegionSizeCalculator(table, admin); + assertEquals(999 * megabyte, calculator.getRegionSize(regionName.getBytes())); + + //then disabled calculator. + configuration.setBoolean(RegionSizeCalculator.ENABLE_REGIONSIZECALCULATOR, false); + RegionSizeCalculator disabledCalculator = new RegionSizeCalculator(table, admin); + assertEquals(0 * megabyte, disabledCalculator.getRegionSize(regionName.getBytes())); + + assertEquals(0, disabledCalculator.getRegionSizeMap().size()); + } + + /** + * Makes some table with given region names. + * */ + private RegionLocator mockRegionLocator(String... regionNames) throws IOException { + RegionLocator mockedTable = Mockito.mock(RegionLocator.class); + when(mockedTable.getName()).thenReturn(TableName.valueOf("sizeTestTable")); + List<HRegionLocation> regionLocations = new ArrayList<>(regionNames.length); + when(mockedTable.getAllRegionLocations()).thenReturn(regionLocations); + + for (String regionName : regionNames) { + HRegionInfo info = Mockito.mock(HRegionInfo.class); + when(info.getRegionName()).thenReturn(regionName.getBytes()); + regionLocations.add(new HRegionLocation(info, sn)); + } + + return mockedTable; + } + + /** + * Creates mock returning RegionLoad info about given servers. + */ + private Admin mockAdmin(RegionLoad... regionLoadArray) throws Exception { + Admin mockAdmin = Mockito.mock(Admin.class); + Map<byte[], RegionLoad> regionLoads = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (RegionLoad regionLoad : regionLoadArray) { + regionLoads.put(regionLoad.getName(), regionLoad); + } + when(mockAdmin.getConfiguration()).thenReturn(configuration); + when(mockAdmin.getRegionLoad(sn, TableName.valueOf("sizeTestTable"))).thenReturn(regionLoads); + return mockAdmin; + } + + /** + * Creates mock of region with given name and size. + * + * @param fileSizeMb number of megabytes occupied by region in file store in megabytes + * */ + private RegionLoad mockRegion(String regionName, int fileSizeMb) { + RegionLoad region = Mockito.mock(RegionLoad.class); + when(region.getName()).thenReturn(regionName.getBytes()); + when(region.getNameAsString()).thenReturn(regionName); + when(region.getStorefileSizeMB()).thenReturn(fileSizeMb); + return region; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java new file mode 100644 index 0000000..3b84e2d --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestRule; + +/** + * Test the rowcounter map reduce job. + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestRowCounter { + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + private static final Log LOG = LogFactory.getLog(TestRowCounter.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String TABLE_NAME = "testRowCounter"; + private final static String TABLE_NAME_TS_RANGE = "testRowCounter_ts_range"; + private final static String COL_FAM = "col_fam"; + private final static String COL1 = "c1"; + private final static String COL2 = "c2"; + private final static String COMPOSITE_COLUMN = "C:A:A"; + private final static int TOTAL_ROWS = 10; + private final static int ROWS_WITH_ONE_COL = 2; + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM)); + writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL); + table.close(); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test a case when no column was specified in command line arguments. + * + * @throws Exception + */ + @Test + public void testRowCounterNoColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME + }; + runRowCount(args, 10); + } + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows. + * + * @throws Exception + */ + @Test + public void testRowCounterExclusiveColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL1 + }; + runRowCount(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is + * one for which the qualifier contains colons. + * + * @throws Exception + */ + @Test + public void testRowCounterColumnWithColonInQualifier() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN + }; + runRowCount(args, 8); + } + + /** + * Test a case when the column specified in command line arguments is not part + * of first KV for a row. + * + * @throws Exception + */ + @Test + public void testRowCounterHiddenColumn() throws Exception { + String[] args = new String[] { + TABLE_NAME, COL_FAM + ":" + COL2 + }; + runRowCount(args, 10); + } + + + /** + * Test a case when the column specified in command line arguments is + * exclusive for few rows and also a row range filter is specified + * + * @throws Exception + */ + @Test + public void testRowCounterColumnAndRowRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1 + }; + runRowCount(args, 8); + } + + /** + * Test a case when a range is specified with single range of start-end keys + * @throws Exception + */ + @Test + public void testRowCounterRowSingleRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3" + }; + runRowCount(args, 2); + } + + /** + * Test a case when a range is specified with single range with end key only + * @throws Exception + */ + @Test + public void testRowCounterRowSingleRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,\\x00row3" + }; + runRowCount(args, 3); + } + + /** + * Test a case when a range is specified with two ranges where one range is with end key only + * @throws Exception + */ + @Test + public void testRowCounterRowMultiRangeUpperBound() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7" + }; + runRowCount(args, 5); + } + + /** + * Test a case when a range is specified with multiple ranges of start-end keys + * @throws Exception + */ + @Test + public void testRowCounterRowMultiRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8" + }; + runRowCount(args, 5); + } + + /** + * Test a case when a range is specified with multiple ranges of start-end keys; + * one range is filled, another two are not + * @throws Exception + */ + @Test + public void testRowCounterRowMultiEmptyRange() throws Exception { + String[] args = new String[] { + TABLE_NAME, "--range=\\x00row1,\\x00row3;;" + }; + runRowCount(args, 2); + } + + @Test + public void testRowCounter10kRowRange() throws Exception { + String tableName = TABLE_NAME + "10k"; + + try (Table table = TEST_UTIL.createTable( + TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) { + writeRows(table, 10000, 0); + } + String[] args = new String[] { + tableName, "--range=\\x00row9872,\\x00row9875" + }; + runRowCount(args, 3); + } + + /** + * Test a case when the timerange is specified with --starttime and --endtime options + * + * @throws Exception + */ + @Test + public void testRowCounterTimeRange() throws Exception { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] col1 = Bytes.toBytes(COL1); + Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1)); + Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2)); + Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3)); + + long ts; + + // clean up content of TABLE_NAME + Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_TS_RANGE), Bytes.toBytes(COL_FAM)); + + ts = System.currentTimeMillis(); + put1.addColumn(family, col1, ts, Bytes.toBytes("val1")); + table.put(put1); + Thread.sleep(100); + + ts = System.currentTimeMillis(); + put2.addColumn(family, col1, ts, Bytes.toBytes("val2")); + put3.addColumn(family, col1, ts, Bytes.toBytes("val3")); + table.put(put2); + table.put(put3); + table.close(); + + String[] args = new String[] { + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + ts + }; + runRowCount(args, 1); + + args = new String[] { + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + 0, + "--endtime=" + (ts - 10) + }; + runRowCount(args, 1); + + args = new String[] { + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + ts, + "--endtime=" + (ts + 1000) + }; + runRowCount(args, 2); + + args = new String[] { + TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1, + "--starttime=" + (ts - 30 * 1000), + "--endtime=" + (ts + 30 * 1000), + }; + runRowCount(args, 3); + } + + /** + * Run the RowCounter map reduce job and verify the row count. + * + * @param args the command line arguments to be used for rowcounter job. + * @param expectedCount the expected row count (result of map reduce job). + * @throws Exception + */ + private void runRowCount(String[] args, int expectedCount) throws Exception { + Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args); + long start = System.currentTimeMillis(); + job.waitForCompletion(true); + long duration = System.currentTimeMillis() - start; + LOG.debug("row count duration (ms): " + duration); + assertTrue(job.isSuccessful()); + Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS); + assertEquals(expectedCount, counter.getValue()); + } + + /** + * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have + * two columns, Few have one. + * + * @param table + * @throws IOException + */ + private static void writeRows(Table table, int totalRows, int rowsWithOneCol) throws IOException { + final byte[] family = Bytes.toBytes(COL_FAM); + final byte[] value = Bytes.toBytes("abcd"); + final byte[] col1 = Bytes.toBytes(COL1); + final byte[] col2 = Bytes.toBytes(COL2); + final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN); + ArrayList<Put> rowsUpdate = new ArrayList<>(); + // write few rows with two columns + int i = 0; + for (; i < totalRows - rowsWithOneCol; i++) { + // Use binary rows values to test for HBASE-15287. + byte[] row = Bytes.toBytesBinary("\\x00row" + i); + Put put = new Put(row); + put.addColumn(family, col1, value); + put.addColumn(family, col2, value); + put.addColumn(family, col3, value); + rowsUpdate.add(put); + } + + // write few rows with only one column + for (; i < totalRows; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put put = new Put(row); + put.addColumn(family, col2, value); + rowsUpdate.add(put); + } + table.put(rowsUpdate); + } + + /** + * test main method. Import should print help and call System.exit + */ + @Test + public void testImportMain() throws Exception { + PrintStream oldPrintStream = System.err; + SecurityManager SECURITY_MANAGER = System.getSecurityManager(); + LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); + System.setSecurityManager(newSecurityManager); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + + try { + RowCounter.main(args); + fail("should be SecurityException"); + } catch (SecurityException e) { + assertEquals(-1, newSecurityManager.getExitCode()); + assertTrue(data.toString().contains("Wrong number of parameters:")); + assertTrue(data.toString().contains( + "Usage: RowCounter [options] <tablename> " + + "[--starttime=[start] --endtime=[end] " + + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] " + + "[<column1> <column2>...]")); + assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100")); + assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false")); + } + data.reset(); + try { + args = new String[2]; + args[0] = "table"; + args[1] = "--range=1"; + RowCounter.main(args); + fail("should be SecurityException"); + } catch (SecurityException e) { + assertEquals(-1, newSecurityManager.getExitCode()); + assertTrue(data.toString().contains( + "Please specify range in such format as \"--range=a,b\" or, with only one boundary," + + " \"--range=,b\" or \"--range=a,\"")); + assertTrue(data.toString().contains( + "Usage: RowCounter [options] <tablename> " + + "[--starttime=[start] --endtime=[end] " + + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] " + + "[<column1> <column2>...]")); + } + + } finally { + System.setErr(oldPrintStream); + System.setSecurityManager(SECURITY_MANAGER); + } + + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java new file mode 100644 index 0000000..78fddbc --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java @@ -0,0 +1,70 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; + +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode. + * This suite is unable to verify the security handoff/turnover + * as miniCluster is running as system user thus has root privileges + * and delegation tokens don't seem to work on miniDFS. + * + * Thus SecureBulkload can only be completely verified by running + * integration tests against a secure cluster. This suite is still + * invaluable as it verifies the other mechanisms that need to be + * supported as part of a LoadIncrementalFiles call. + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{ + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // set the always on security provider + UserProvider.setUserProviderForTesting(util.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + // setup configuration + SecureTestUtil.enableSecurity(util.getConfiguration()); + util.getConfiguration().setInt( + LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, + MAX_FILES_PER_REGION_PER_FAMILY); + // change default behavior so that tag values are returned with normal rpcs + util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, + KeyValueCodecWithTags.class.getCanonicalName()); + + util.startMiniCluster(); + + // Wait for the ACL table to become available + util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME); + + setupNamespace(); + } + +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java new file mode 100644 index 0000000..0e877ad --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessControlLists; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +/** + * Reruns TestSecureLoadIncrementalHFilesSplitRecovery + * using LoadIncrementalHFiles in secure mode. + * This suite is unable to verify the security handoff/turnove + * as miniCluster is running as system user thus has root privileges + * and delegation tokens don't seem to work on miniDFS. + * + * Thus SecureBulkload can only be completely verified by running + * integration tests against a secure cluster. This suite is still + * invaluable as it verifies the other mechanisms that need to be + * supported as part of a LoadIncrementalFiles call. + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestSecureLoadIncrementalHFilesSplitRecovery extends TestLoadIncrementalHFilesSplitRecovery { + + //This "overrides" the parent static method + //make sure they are in sync + @BeforeClass + public static void setupCluster() throws Exception { + util = new HBaseTestingUtility(); + // set the always on security provider + UserProvider.setUserProviderForTesting(util.getConfiguration(), + HadoopSecurityEnabledUserProviderForTesting.class); + // setup configuration + SecureTestUtil.enableSecurity(util.getConfiguration()); + + util.startMiniCluster(); + + // Wait for the ACL table to become available + util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME); + } + + //Disabling this test as it does not work in secure mode + @Test (timeout=180000) + @Override + public void testBulkLoadPhaseFailure() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java new file mode 100644 index 0000000..5629cb4 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java @@ -0,0 +1,81 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.experimental.categories.Category; + +import org.junit.Test; + +/** + * Test of simple partitioner. + */ +@Category({MapReduceTests.class, SmallTests.class}) +public class TestSimpleTotalOrderPartitioner { + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Configuration conf = TEST_UTIL.getConfiguration(); + + @Test + public void testSplit() throws Exception { + String start = "a"; + String end = "{"; + SimpleTotalOrderPartitioner<byte []> p = new SimpleTotalOrderPartitioner<>(); + + this.conf.set(SimpleTotalOrderPartitioner.START, start); + this.conf.set(SimpleTotalOrderPartitioner.END, end); + p.setConf(this.conf); + ImmutableBytesWritable c = new ImmutableBytesWritable(Bytes.toBytes("c")); + // If one reduce, partition should be 0. + int partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 1); + assertEquals(0, partition); + // If two reduces, partition should be 0. + partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(0, partition); + // Divide in 3. + partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(0, partition); + ImmutableBytesWritable q = new ImmutableBytesWritable(Bytes.toBytes("q")); + partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(1, partition); + partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(2, partition); + // What about end and start keys. + ImmutableBytesWritable startBytes = + new ImmutableBytesWritable(Bytes.toBytes(start)); + partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(0, partition); + partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(0, partition); + ImmutableBytesWritable endBytes = + new ImmutableBytesWritable(Bytes.toBytes("z")); + partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 2); + assertEquals(1, partition); + partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 3); + assertEquals(2, partition); + } + +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java new file mode 100644 index 0000000..9a0c160 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counters; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; + +/** + * Basic test for the SyncTable M/R tool + */ +@Category(LargeTests.class) +public class TestSyncTable { + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + private static final Log LOG = LogFactory.getLog(TestSyncTable.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void beforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static byte[][] generateSplits(int numRows, int numRegions) { + byte[][] splitRows = new byte[numRegions-1][]; + for (int i = 1; i < numRegions; i++) { + splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions); + } + return splitRows; + } + + @Test + public void testSyncTable() throws Exception { + final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); + final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); + Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable"); + + writeTestData(sourceTableName, targetTableName); + hashSourceTable(sourceTableName, testDir); + Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir); + assertEqualTables(90, sourceTableName, targetTableName); + + assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); + assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); + + TEST_UTIL.deleteTable(sourceTableName); + TEST_UTIL.deleteTable(targetTableName); + TEST_UTIL.cleanupDataTestDirOnTestFS(); + } + + private void assertEqualTables(int expectedRows, TableName sourceTableName, + TableName targetTableName) throws Exception { + Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); + Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); + + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan()); + + for (int i = 0; i < expectedRows; i++) { + Result sourceRow = sourceScanner.next(); + Result targetRow = targetScanner.next(); + + LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + if (sourceRow == null) { + Assert.fail("Expected " + expectedRows + + " source rows but only found " + i); + } + if (targetRow == null) { + Assert.fail("Expected " + expectedRows + + " target rows but only found " + i); + } + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); + if (sourceCells.length != targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + + " has " + sourceCells.length + + " cells in source table but " + targetCells.length + + " cells in target table"); + } + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + try { + if (!CellUtil.matchingRow(sourceCell, targetCell)) { + Assert.fail("Rows don't match"); + } + if (!CellUtil.matchingFamily(sourceCell, targetCell)) { + Assert.fail("Families don't match"); + } + if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { + Assert.fail("Qualifiers don't match"); + } + if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { + Assert.fail("Timestamps don't match"); + } + if (!CellUtil.matchingValue(sourceCell, targetCell)) { + Assert.fail("Values don't match"); + } + } catch (Throwable t) { + LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); + Throwables.propagate(t); + } + } + } + Result sourceRow = sourceScanner.next(); + if (sourceRow != null) { + Assert.fail("Source table has more than " + expectedRows + + " rows. Next row: " + Bytes.toInt(sourceRow.getRow())); + } + Result targetRow = targetScanner.next(); + if (targetRow != null) { + Assert.fail("Target table has more than " + expectedRows + + " rows. Next row: " + Bytes.toInt(targetRow.getRow())); + } + sourceScanner.close(); + targetScanner.close(); + sourceTable.close(); + targetTable.close(); + } + + private Counters syncTables(TableName sourceTableName, TableName targetTableName, + Path testDir) throws Exception { + SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); + int code = syncTable.run(new String[] { + testDir.toString(), + sourceTableName.getNameAsString(), + targetTableName.getNameAsString() + }); + assertEquals("sync table job failed", 0, code); + + LOG.info("Sync tables completed"); + return syncTable.counters; + } + + private void hashSourceTable(TableName sourceTableName, Path testDir) + throws Exception, IOException { + int numHashFiles = 3; + long batchSize = 100; // should be 2 batches per region + int scanBatch = 1; + HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); + int code = hashTable.run(new String[] { + "--batchsize=" + batchSize, + "--numhashfiles=" + numHashFiles, + "--scanbatch=" + scanBatch, + sourceTableName.getNameAsString(), + testDir.toString()}); + assertEquals("hash table job failed", 0, code); + + FileSystem fs = TEST_UTIL.getTestFileSystem(); + + HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); + assertEquals(sourceTableName.getNameAsString(), tableHash.tableName); + assertEquals(batchSize, tableHash.batchSize); + assertEquals(numHashFiles, tableHash.numHashFiles); + assertEquals(numHashFiles - 1, tableHash.partitions.size()); + + LOG.info("Hash table completed"); + } + + private void writeTestData(TableName sourceTableName, TableName targetTableName) + throws Exception { + final byte[] family = Bytes.toBytes("family"); + final byte[] column1 = Bytes.toBytes("c1"); + final byte[] column2 = Bytes.toBytes("c2"); + final byte[] value1 = Bytes.toBytes("val1"); + final byte[] value2 = Bytes.toBytes("val2"); + final byte[] value3 = Bytes.toBytes("val3"); + + int numRows = 100; + int sourceRegions = 10; + int targetRegions = 6; + + Table sourceTable = TEST_UTIL.createTable(sourceTableName, + family, generateSplits(numRows, sourceRegions)); + + Table targetTable = TEST_UTIL.createTable(targetTableName, + family, generateSplits(numRows, targetRegions)); + + long timestamp = 1430764183454L; + + int rowIndex = 0; + // a bunch of identical rows + for (; rowIndex < 40; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value1); + targetPut.addColumn(family, column2, timestamp, value2); + targetTable.put(targetPut); + } + // some rows only in the source table + // ROWSWITHDIFFS: 10 + // TARGETMISSINGROWS: 10 + // TARGETMISSINGCELLS: 20 + for (; rowIndex < 50; rowIndex++) { + Put put = new Put(Bytes.toBytes(rowIndex)); + put.addColumn(family, column1, timestamp, value1); + put.addColumn(family, column2, timestamp, value2); + sourceTable.put(put); + } + // some rows only in the target table + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGROWS: 10 + // SOURCEMISSINGCELLS: 20 + for (; rowIndex < 60; rowIndex++) { + Put put = new Put(Bytes.toBytes(rowIndex)); + put.addColumn(family, column1, timestamp, value1); + put.addColumn(family, column2, timestamp, value2); + targetTable.put(put); + } + // some rows with 1 missing cell in target table + // ROWSWITHDIFFS: 10 + // TARGETMISSINGCELLS: 10 + for (; rowIndex < 70; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value1); + targetTable.put(targetPut); + } + // some rows with 1 missing cell in source table + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGCELLS: 10 + for (; rowIndex < 80; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value1); + targetPut.addColumn(family, column2, timestamp, value2); + targetTable.put(targetPut); + } + // some rows differing only in timestamp + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGCELLS: 20 + // TARGETMISSINGCELLS: 20 + for (; rowIndex < 90; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, column1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp+1, column1); + targetPut.addColumn(family, column2, timestamp-1, value2); + targetTable.put(targetPut); + } + // some rows with different values + // ROWSWITHDIFFS: 10 + // DIFFERENTCELLVALUES: 20 + for (; rowIndex < numRows; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamp, value1); + sourcePut.addColumn(family, column2, timestamp, value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamp, value3); + targetPut.addColumn(family, column2, timestamp, value3); + targetTable.put(targetPut); + } + + sourceTable.close(); + targetTable.close(); + } + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java new file mode 100644 index 0000000..b4c6ab9 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java @@ -0,0 +1,481 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * This tests the TableInputFormat and its recovery semantics + * + */ +@Category(LargeTests.class) +public class TestTableInputFormat { + + private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class); + + private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static MiniMRCluster mrCluster; + static final byte[] FAMILY = Bytes.toBytes("family"); + + private static final byte[][] columns = new byte[][] { FAMILY }; + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + LOG.info("before"); + UTIL.ensureSomeRegionServersAvailable(1); + LOG.info("before done"); + } + + /** + * Setup a table with two rows and values. + * + * @param tableName + * @return + * @throws IOException + */ + public static Table createTable(byte[] tableName) throws IOException { + return createTable(tableName, new byte[][] { FAMILY }); + } + + /** + * Setup a table with two rows and values per column family. + * + * @param tableName + * @return + * @throws IOException + */ + public static Table createTable(byte[] tableName, byte[][] families) throws IOException { + Table table = UTIL.createTable(TableName.valueOf(tableName), families); + Put p = new Put("aaa".getBytes()); + for (byte[] family : families) { + p.addColumn(family, null, "value aaa".getBytes()); + } + table.put(p); + p = new Put("bbb".getBytes()); + for (byte[] family : families) { + p.addColumn(family, null, "value bbb".getBytes()); + } + table.put(p); + return table; + } + + /** + * Verify that the result and key have expected values. + * + * @param r + * @param key + * @param expectedKey + * @param expectedValue + * @return + */ + static boolean checkResult(Result r, ImmutableBytesWritable key, + byte[] expectedKey, byte[] expectedValue) { + assertEquals(0, key.compareTo(expectedKey)); + Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); + byte[] value = vals.values().iterator().next(); + assertTrue(Arrays.equals(value, expectedValue)); + return true; // if succeed + } + + /** + * Create table data and run tests on specified htable using the + * o.a.h.hbase.mapreduce API. + * + * @param table + * @throws IOException + * @throws InterruptedException + */ + static void runTestMapreduce(Table table) throws IOException, + InterruptedException { + org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr = + new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl(); + Scan s = new Scan(); + s.setStartRow("aaa".getBytes()); + s.setStopRow("zzz".getBytes()); + s.addFamily(FAMILY); + trr.setScan(s); + trr.setHTable(table); + + trr.initialize(null, null); + Result r = new Result(); + ImmutableBytesWritable key = new ImmutableBytesWritable(); + + boolean more = trr.nextKeyValue(); + assertTrue(more); + key = trr.getCurrentKey(); + r = trr.getCurrentValue(); + checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes()); + + more = trr.nextKeyValue(); + assertTrue(more); + key = trr.getCurrentKey(); + r = trr.getCurrentValue(); + checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes()); + + // no more data + more = trr.nextKeyValue(); + assertFalse(more); + } + + /** + * Create a table that IOE's on first scanner next call + * + * @throws IOException + */ + static Table createIOEScannerTable(byte[] name, final int failCnt) + throws IOException { + // build up a mock scanner stuff to fail the first time + Answer<ResultScanner> a = new Answer<ResultScanner>() { + int cnt = 0; + + @Override + public ResultScanner answer(InvocationOnMock invocation) throws Throwable { + // first invocation return the busted mock scanner + if (cnt++ < failCnt) { + // create mock ResultScanner that always fails. + Scan scan = mock(Scan.class); + doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe + ResultScanner scanner = mock(ResultScanner.class); + // simulate TimeoutException / IOException + doThrow(new IOException("Injected exception")).when(scanner).next(); + return scanner; + } + + // otherwise return the real scanner. + return (ResultScanner) invocation.callRealMethod(); + } + }; + + Table htable = spy(createTable(name)); + doAnswer(a).when(htable).getScanner((Scan) anyObject()); + return htable; + } + + /** + * Create a table that throws a NotServingRegionException on first scanner + * next call + * + * @throws IOException + */ + static Table createDNRIOEScannerTable(byte[] name, final int failCnt) + throws IOException { + // build up a mock scanner stuff to fail the first time + Answer<ResultScanner> a = new Answer<ResultScanner>() { + int cnt = 0; + + @Override + public ResultScanner answer(InvocationOnMock invocation) throws Throwable { + // first invocation return the busted mock scanner + if (cnt++ < failCnt) { + // create mock ResultScanner that always fails. + Scan scan = mock(Scan.class); + doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe + ResultScanner scanner = mock(ResultScanner.class); + + invocation.callRealMethod(); // simulate NotServingRegionException + doThrow( + new NotServingRegionException("Injected simulated TimeoutException")) + .when(scanner).next(); + return scanner; + } + + // otherwise return the real scanner. + return (ResultScanner) invocation.callRealMethod(); + } + }; + + Table htable = spy(createTable(name)); + doAnswer(a).when(htable).getScanner((Scan) anyObject()); + return htable; + } + + /** + * Run test assuming no errors using newer mapreduce api + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testTableRecordReaderMapreduce() throws IOException, + InterruptedException { + Table table = createTable("table1-mr".getBytes()); + runTestMapreduce(table); + } + + /** + * Run test assuming Scanner IOException failure using newer mapreduce api + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testTableRecordReaderScannerFailMapreduce() throws IOException, + InterruptedException { + Table htable = createIOEScannerTable("table2-mr".getBytes(), 1); + runTestMapreduce(htable); + } + + /** + * Run test assuming Scanner IOException failure using newer mapreduce api + * + * @throws IOException + * @throws InterruptedException + */ + @Test(expected = IOException.class) + public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException, + InterruptedException { + Table htable = createIOEScannerTable("table3-mr".getBytes(), 2); + runTestMapreduce(htable); + } + + /** + * Run test assuming NotServingRegionException using newer mapreduce api + * + * @throws InterruptedException + * @throws org.apache.hadoop.hbase.DoNotRetryIOException + */ + @Test + public void testTableRecordReaderScannerTimeoutMapreduce() + throws IOException, InterruptedException { + Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1); + runTestMapreduce(htable); + } + + /** + * Run test assuming NotServingRegionException using newer mapreduce api + * + * @throws InterruptedException + * @throws org.apache.hadoop.hbase.NotServingRegionException + */ + @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) + public void testTableRecordReaderScannerTimeoutMapreduceTwice() + throws IOException, InterruptedException { + Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2); + runTestMapreduce(htable); + } + + /** + * Verify the example we present in javadocs on TableInputFormatBase + */ + @Test + public void testExtensionOfTableInputFormatBase() + throws IOException, InterruptedException, ClassNotFoundException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase"); + final Table htable = createTable(Bytes.toBytes("exampleTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleTIF.class); + } + + @Test + public void testJobConfigurableExtensionOfTableInputFormatBase() + throws IOException, InterruptedException, ClassNotFoundException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "using JobConfigurable."); + final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleJobConfigurableTIF.class); + } + + @Test + public void testDeprecatedExtensionOfTableInputFormatBase() + throws IOException, InterruptedException, ClassNotFoundException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "using the approach documented in 0.98."); + final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleDeprecatedTIF.class); + } + + void testInputFormat(Class<? extends InputFormat> clazz) + throws IOException, InterruptedException, ClassNotFoundException { + final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration()); + job.setInputFormatClass(clazz); + job.setOutputFormatClass(NullOutputFormat.class); + job.setMapperClass(ExampleVerifier.class); + job.setNumReduceTasks(0); + + LOG.debug("submitting job."); + assertTrue("job failed!", job.waitForCompletion(true)); + assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue()); + assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue()); + assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue()); + assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue()); + assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue()); + assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue()); + } + + public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> { + + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException { + for (Cell cell : value.listCells()) { + context.getCounter(TestTableInputFormat.class.getName() + ":row", + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) + .increment(1l); + context.getCounter(TestTableInputFormat.class.getName() + ":family", + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) + .increment(1l); + context.getCounter(TestTableInputFormat.class.getName() + ":value", + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) + .increment(1l); + } + } + + } + + public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { + + @Override + public void configure(JobConf job) { + try { + Connection connection = ConnectionFactory.createConnection(job); + Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable"))); + // mandatory + initializeTable(connection, exampleTable.getName()); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + // optional + Scan scan = new Scan(); + for (byte[] family : inputColumns) { + scan.addFamily(family); + } + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + scan.setFilter(exampleFilter); + setScan(scan); + } catch (IOException exception) { + throw new RuntimeException("Failed to configure for job.", exception); + } + } + + } + + + public static class ExampleJobConfigurableTIF extends TableInputFormatBase + implements JobConfigurable { + + @Override + public void configure(JobConf job) { + try { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + TableName tableName = TableName.valueOf("exampleJobConfigurableTable"); + // mandatory + initializeTable(connection, tableName); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + //optional + Scan scan = new Scan(); + for (byte[] family : inputColumns) { + scan.addFamily(family); + } + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + scan.setFilter(exampleFilter); + setScan(scan); + } catch (IOException exception) { + throw new RuntimeException("Failed to initialize.", exception); + } + } + } + + + public static class ExampleTIF extends TableInputFormatBase { + + @Override + protected void initialize(JobContext job) throws IOException { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create( + job.getConfiguration())); + TableName tableName = TableName.valueOf("exampleTable"); + // mandatory + initializeTable(connection, tableName); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + //optional + Scan scan = new Scan(); + for (byte[] family : inputColumns) { + scan.addFamily(family); + } + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + scan.setFilter(exampleFilter); + setScan(scan); + } + + } +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java new file mode 100644 index 0000000..699e773 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java @@ -0,0 +1,53 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.*; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({SmallTests.class}) +public class TestTableInputFormatBase { + @Test + public void testTableInputFormatBaseReverseDNSForIPv6() + throws UnknownHostException { + String address = "ipv6.google.com"; + String localhost = null; + InetAddress addr = null; + TableInputFormat inputFormat = new TableInputFormat(); + try { + localhost = InetAddress.getByName(address).getCanonicalHostName(); + addr = Inet6Address.getByName(address); + } catch (UnknownHostException e) { + // google.com is down, we can probably forgive this test. + return; + } + System.out.println("Should retrun the hostname for this host " + + localhost + " addr : " + addr); + String actualHostName = inputFormat.reverseDNS(addr); + assertEquals("Should retrun the hostname for this host. Expected : " + + localhost + " Actual : " + actualHostName, localhost, actualHostName); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java new file mode 100644 index 0000000..99b40b9 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java @@ -0,0 +1,200 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * TestTableInputFormatScan part 1. + * @see TestTableInputFormatScanBase + */ +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase { + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, null, null); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToAPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "app", "apo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBA() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "bba", "baz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToBBB() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "bbb", "bba"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanEmptyToOPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan(null, "opp", "opo"); + } + + /** + * Tests a MR scan using specific number of mappers. The test table has 25 regions, + * and all region sizes are set as 0 as default. The average region size is 1 (the smallest + * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two + * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase + * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region + * size, all regions will be combined into 1 MapRedcue input split. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException { + testNumOfSplits("-1", 52); + testNumOfSplits("100", 1); + } + + /** + * Tests the getSplitKey() method in TableInputFormatBase.java + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testGetSplitsPoint() throws IOException, InterruptedException, + ClassNotFoundException { + byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' }; + byte[] end1 = { 'a', 'a', 'a', 'f', 'f' }; + byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77 }; + testGetSplitKey(start1, end1, splitPoint1, true); + + byte[] start2 = { '1', '1', '1', '0', '0', '0' }; + byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' }; + byte[] splitPoint2 = { '1', '1', '1', -78, -77, -76, -104 }; + testGetSplitKey(start2, end2, splitPoint2, true); + + byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' }; + byte[] end3 = { 'a', 'a', 'b' }; + byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 }; + testGetSplitKey(start3, end3, splitPoint3, true); + + byte[] start4 = { 'a', 'a', 'a' }; + byte[] end4 = { 'a', 'a', 'a', 'z' }; + byte[] splitPoint4 = { 'a', 'a', 'a', '=' }; + testGetSplitKey(start4, end4, splitPoint4, true); + + byte[] start5 = { 'a', 'a', 'a' }; + byte[] end5 = { 'a', 'a', 'b', 'a' }; + byte[] splitPoint5 = { 'a', 'a', 'a', -80 }; + testGetSplitKey(start5, end5, splitPoint5, true); + + // Test Case 6: empty key and "hhhqqqwww", split point is "h" + byte[] start6 = {}; + byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' }; + byte[] splitPointText6 = { 'h' }; + byte[] splitPointBinary6 = { 104 }; + testGetSplitKey(start6, end6, splitPointText6, true); + testGetSplitKey(start6, end6, splitPointBinary6, false); + + // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or + // binary key). + byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' }; + byte[] end7 = {}; + byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' }; + byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1 }; + testGetSplitKey(start7, end7, splitPointText7, true); + testGetSplitKey(start7, end7, splitPointBinary7, false); + + // Test Case 8: both start key and end key are empty. Split point depends on the mode we + // choose (text key or binary key). + byte[] start8 = {}; + byte[] end8 = {}; + byte[] splitPointText8 = { 'O' }; + byte[] splitPointBinary8 = { 0 }; + testGetSplitKey(start8, end8, splitPointText8, true); + testGetSplitKey(start8, end8, splitPointBinary8, false); + + // Test Case 9: Binary Key example + byte[] start9 = { 13, -19, 126, 127 }; + byte[] end9 = { 13, -19, 127, 0 }; + byte[] splitPoint9 = { 13, -19, 126, -65 }; + testGetSplitKey(start9, end9, splitPoint9, false); + + // Test Case 10: Binary key split when the start key is an unsigned byte and the end byte is a + // signed byte + byte[] start10 = { 'x' }; + byte[] end10 = { -128 }; + byte[] splitPoint10 = { '|' }; + testGetSplitKey(start10, end10, splitPoint10, false); + + // Test Case 11: Binary key split when the start key is an signed byte and the end byte is a + // signed byte + byte[] start11 = { -100 }; + byte[] end11 = { -90 }; + byte[] splitPoint11 = { -95 }; + testGetSplitKey(start11, end11, splitPoint11, false); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java new file mode 100644 index 0000000..02f893f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java @@ -0,0 +1,118 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * TestTableInputFormatScan part 2. + * @see TestTableInputFormatScanBase + */ +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestTableInputFormatScan2 extends TestTableInputFormatScanBase { + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOBBToOPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOBBToQPP() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("obb", "qpp", "qpo"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanOPPToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("opp", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYYXToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yyx", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYYYToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yyy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testScanYZYToEmpty() + throws IOException, InterruptedException, ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + @Test + public void testScanFromConfiguration() + throws IOException, InterruptedException, ClassNotFoundException { + testScanFromConfiguration("bba", "bbd", "bbc"); + } +}