http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java deleted file mode 100644 index 0f49333..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java +++ /dev/null @@ -1,287 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; - - -/** - * <p> - * Tests various scan start and stop row scenarios. This is set in a scan and - * tested in a MapReduce job to see if that is handed over and done properly - * too. - * </p> - * <p> - * This test is broken into two parts in order to side-step the test timeout - * period of 900, as documented in HBASE-8326. - * </p> - */ -public abstract class TestTableInputFormatScanBase { - - private static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class); - static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - static final TableName TABLE_NAME = TableName.valueOf("scantest"); - static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; - static final String KEY_STARTROW = "startRow"; - static final String KEY_LASTROW = "stpRow"; - - private static Table table = null; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. - // this turns it off for this test. TODO: Figure out why scr breaks recovery. - System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); - - // switch TIF to log at DEBUG level - TEST_UTIL.enableDebug(TableInputFormat.class); - TEST_UTIL.enableDebug(TableInputFormatBase.class); - // start mini hbase cluster - TEST_UTIL.startMiniCluster(3); - // create and fill table - table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); - TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Pass the key and value to reduce. - */ - public static class ScanMapper - extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { - - /** - * Pass the key and value to reduce. - * - * @param key The key, here "aaa", "aab" etc. - * @param value The value is the same as the key. - * @param context The task context. - * @throws IOException When reading the rows fails. - */ - @Override - public void map(ImmutableBytesWritable key, Result value, - Context context) - throws IOException, InterruptedException { - if (value.size() != 2) { - throw new IOException("There should be two input columns"); - } - Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> - cfMap = value.getMap(); - - if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); - } - - String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); - String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); - LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + - ", value -> (" + val0 + ", " + val1 + ")"); - context.write(key, key); - } - } - - /** - * Checks the last and first key seen against the scanner boundaries. - */ - public static class ScanReducer - extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, - NullWritable, NullWritable> { - - private String first = null; - private String last = null; - - protected void reduce(ImmutableBytesWritable key, - Iterable<ImmutableBytesWritable> values, Context context) - throws IOException ,InterruptedException { - int count = 0; - for (ImmutableBytesWritable value : values) { - String val = Bytes.toStringBinary(value.get()); - LOG.info("reduce: key[" + count + "] -> " + - Bytes.toStringBinary(key.get()) + ", value -> " + val); - if (first == null) first = val; - last = val; - count++; - } - } - - protected void cleanup(Context context) - throws IOException, InterruptedException { - Configuration c = context.getConfiguration(); - String startRow = c.get(KEY_STARTROW); - String lastRow = c.get(KEY_LASTROW); - LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); - LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); - if (startRow != null && startRow.length() > 0) { - assertEquals(startRow, first); - } - if (lastRow != null && lastRow.length() > 0) { - assertEquals(lastRow, last); - } - } - - } - - /** - * Tests an MR Scan initialized from properties set in the Configuration. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - protected void testScanFromConfiguration(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + - "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); - c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILYS[0]) + ", " - + Bytes.toString(INPUT_FAMILYS[1])); - c.set(KEY_STARTROW, start != null ? start : ""); - c.set(KEY_LASTROW, last != null ? last : ""); - - if (start != null) { - c.set(TableInputFormat.SCAN_ROW_START, start); - } - - if (stop != null) { - c.set(TableInputFormat.SCAN_ROW_STOP, stop); - } - - Job job = new Job(c, jobName); - job.setMapperClass(ScanMapper.class); - job.setReducerClass(ScanReducer.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(ImmutableBytesWritable.class); - job.setInputFormatClass(TableInputFormat.class); - job.setNumReduceTasks(1); - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - TableMapReduceUtil.addDependencyJars(job); - assertTrue(job.waitForCompletion(true)); - } - - /** - * Tests a MR scan using specific start and stop rows. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - protected void testScan(String start, String stop, String last) - throws IOException, InterruptedException, ClassNotFoundException { - String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + - "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); - LOG.info("Before map/reduce startup - job " + jobName); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILYS[0]); - scan.addFamily(INPUT_FAMILYS[1]); - if (start != null) { - scan.setStartRow(Bytes.toBytes(start)); - } - c.set(KEY_STARTROW, start != null ? start : ""); - if (stop != null) { - scan.setStopRow(Bytes.toBytes(stop)); - } - c.set(KEY_LASTROW, last != null ? last : ""); - LOG.info("scan before: " + scan); - Job job = new Job(c, jobName); - TableMapReduceUtil.initTableMapperJob( - TABLE_NAME, scan, ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - job.setReducerClass(ScanReducer.class); - job.setNumReduceTasks(1); // one to get final "first" and "last" key - FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); - LOG.info("Started " + job.getJobName()); - assertTrue(job.waitForCompletion(true)); - LOG.info("After map/reduce completion - job " + jobName); - } - - - /** - * Tests a MR scan using data skew auto-balance - * - * @throws IOException - * @throws ClassNotFoundException - * @throws InterruptedException - */ - public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException, - InterruptedException, - ClassNotFoundException { - String jobName = "TestJobForNumOfSplits"; - LOG.info("Before map/reduce startup - job " + jobName); - Configuration c = new Configuration(TEST_UTIL.getConfiguration()); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILYS[0]); - scan.addFamily(INPUT_FAMILYS[1]); - c.set("hbase.mapreduce.input.autobalance", "true"); - c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio); - c.set(KEY_STARTROW, ""); - c.set(KEY_LASTROW, ""); - Job job = new Job(c, jobName); - TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, - ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); - TableInputFormat tif = new TableInputFormat(); - tif.setConf(job.getConfiguration()); - Assert.assertEquals(TABLE_NAME, table.getName()); - List<InputSplit> splits = tif.getSplits(job); - Assert.assertEquals(expectedNumOfSplits, splits.size()); - } - - /** - * Tests for the getSplitKey() method in TableInputFormatBase.java - */ - public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) { - byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText); - Assert.assertArrayEquals(splitKey, result); - } -} -
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java deleted file mode 100644 index d702e0d..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Test Map/Reduce job over HBase tables. The map/reduce process we're testing - * on our tables is simple - take every row in the table, reverse the value of - * a particular cell, and write it back to the table. - */ - -@Category({VerySlowMapReduceTests.class, LargeTests.class}) -public class TestTableMapReduce extends TestTableMapReduceBase { - private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); - - @Override - protected Log getLog() { return LOG; } - - /** - * Pass the given key and processed record reduce - */ - static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> { - - /** - * Pass the key, and reversed value to reduce - * - * @param key - * @param value - * @param context - * @throws IOException - */ - @Override - public void map(ImmutableBytesWritable key, Result value, - Context context) - throws IOException, InterruptedException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - - // Get the original value and reverse it - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); - StringBuilder newValue = new StringBuilder(originalValue); - newValue.reverse(); - // Now set the value to be collected - Put outval = new Put(key.get()); - outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); - context.write(key, outval); - } - } - - @Override - protected void runTestOnTable(Table table) throws IOException { - Job job = null; - try { - LOG.info("Before map/reduce startup"); - job = new Job(table.getConfiguration(), "process column contents"); - job.setNumReduceTasks(1); - Scan scan = new Scan(); - scan.addFamily(INPUT_FAMILY); - TableMapReduceUtil.initTableMapperJob( - table.getName().getNameAsString(), scan, - ProcessContentsMapper.class, ImmutableBytesWritable.class, - Put.class, job); - TableMapReduceUtil.initTableReducerJob( - table.getName().getNameAsString(), - IdentityTableReducer.class, job); - FileOutputFormat.setOutputPath(job, new Path("test")); - LOG.info("Started " + table.getName().getNameAsString()); - assertTrue(job.waitForCompletion(true)); - LOG.info("After map/reduce completion"); - - // verify map-reduce results - verify(table.getName()); - - verifyJobCountersAreEmitted(job); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } finally { - table.close(); - if (job != null) { - FileUtil.fullyDelete( - new File(job.getConfiguration().get("hadoop.tmp.dir"))); - } - } - } - - /** - * Verify scan counters are emitted from the job - * @param job - * @throws IOException - */ - private void verifyJobCountersAreEmitted(Job job) throws IOException { - Counters counters = job.getCounters(); - Counter counter - = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS"); - assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter); - assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0); - } - - @Test(expected = TableNotEnabledException.class) - public void testWritingToDisabledTable() throws IOException { - - try (Admin admin = UTIL.getConnection().getAdmin(); - Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) { - admin.disableTable(table.getName()); - runTestOnTable(table); - fail("Should not have reached here, should have thrown an exception"); - } - } - - @Test(expected = TableNotFoundException.class) - public void testWritingToNonExistentTable() throws IOException { - - try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) { - runTestOnTable(table); - fail("Should not have reached here, should have thrown an exception"); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java deleted file mode 100644 index 27bf063..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.logging.Log; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CategoryBasedTimeout; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestRule; - -/** - * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing - * on our tables is simple - take every row in the table, reverse the value of a particular cell, - * and write it back to the table. Implements common components between mapred and mapreduce - * implementations. - */ -public abstract class TestTableMapReduceBase { - @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). - withTimeout(this.getClass()).withLookingForStuckThread(true).build(); - protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); - protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable"); - protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); - protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); - - protected static final byte[][] columns = new byte[][] { - INPUT_FAMILY, - OUTPUT_FAMILY - }; - - /** - * Retrieve my logger instance. - */ - protected abstract Log getLog(); - - /** - * Handles API-specifics for setting up and executing the job. - */ - protected abstract void runTestOnTable(Table table) throws IOException; - - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(); - Table table = - UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, - OUTPUT_FAMILY }); - UTIL.loadTable(table, INPUT_FAMILY, false); - UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS); - UTIL.shutdownMiniCluster(); - } - - /** - * Test a map/reduce against a multi-region table - * @throws IOException - */ - @Test - public void testMultiRegionTable() throws IOException { - runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); - } - - @Test - public void testCombiner() throws IOException { - Configuration conf = new Configuration(UTIL.getConfiguration()); - // force use of combiner for testing purposes - conf.setInt("mapreduce.map.combine.minspills", 1); - runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); - } - - /** - * Implements mapper logic for use across APIs. - */ - protected static Put map(ImmutableBytesWritable key, Result value) throws IOException { - if (value.size() != 1) { - throw new IOException("There should only be one input column"); - } - Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> - cf = value.getMap(); - if(!cf.containsKey(INPUT_FAMILY)) { - throw new IOException("Wrong input columns. Missing: '" + - Bytes.toString(INPUT_FAMILY) + "'."); - } - - // Get the original value and reverse it - - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); - StringBuilder newValue = new StringBuilder(originalValue); - newValue.reverse(); - - // Now set the value to be collected - - Put outval = new Put(key.get()); - outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); - return outval; - } - - protected void verify(TableName tableName) throws IOException { - Table table = UTIL.getConnection().getTable(tableName); - boolean verified = false; - long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); - int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); - for (int i = 0; i < numRetries; i++) { - try { - getLog().info("Verification attempt #" + i); - verifyAttempt(table); - verified = true; - break; - } catch (NullPointerException e) { - // If here, a cell was empty. Presume its because updates came in - // after the scanner had been opened. Wait a while and retry. - getLog().debug("Verification attempt failed: " + e.getMessage()); - } - try { - Thread.sleep(pause); - } catch (InterruptedException e) { - // continue - } - } - assertTrue(verified); - } - - /** - * Looks at every value of the mapreduce output and verifies that indeed - * the values have been reversed. - * @param table Table to scan. - * @throws IOException - * @throws NullPointerException if we failed to find a cell value - */ - private void verifyAttempt(final Table table) throws IOException, NullPointerException { - Scan scan = new Scan(); - TableInputFormat.addColumns(scan, columns); - ResultScanner scanner = table.getScanner(scan); - try { - Iterator<Result> itr = scanner.iterator(); - assertTrue(itr.hasNext()); - while(itr.hasNext()) { - Result r = itr.next(); - if (getLog().isDebugEnabled()) { - if (r.size() > 2 ) { - throw new IOException("Too many results, expected 2 got " + - r.size()); - } - } - byte[] firstValue = null; - byte[] secondValue = null; - int count = 0; - for(Cell kv : r.listCells()) { - if (count == 0) { - firstValue = CellUtil.cloneValue(kv); - } - if (count == 1) { - secondValue = CellUtil.cloneValue(kv); - } - count++; - if (count == 2) { - break; - } - } - - - if (firstValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": first value is null"); - } - String first = Bytes.toString(firstValue); - - if (secondValue == null) { - throw new NullPointerException(Bytes.toString(r.getRow()) + - ": second value is null"); - } - byte[] secondReversed = new byte[secondValue.length]; - for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { - secondReversed[i] = secondValue[j]; - } - String second = Bytes.toString(secondReversed); - - if (first.compareTo(second) != 0) { - if (getLog().isDebugEnabled()) { - getLog().debug("second key is not the reverse of first. row=" + - Bytes.toStringBinary(r.getRow()) + ", first value=" + first + - ", second value=" + second); - } - fail(); - } - } - } finally { - scanner.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java deleted file mode 100644 index 303a144..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the License. You may - * obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * Test different variants of initTableMapperJob method - */ -@Category({MapReduceTests.class, SmallTests.class}) -public class TestTableMapReduceUtil { - - /* - * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because - * the method depends on an online cluster. - */ - - @Test - public void testInitTableMapperJob1() throws Exception { - Configuration configuration = new Configuration(); - Job job = new Job(configuration, "tableName"); - // test - TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, - Text.class, job, false, WALInputFormat.class); - assertEquals(WALInputFormat.class, job.getInputFormatClass()); - assertEquals(Import.Importer.class, job.getMapperClass()); - assertEquals(LongWritable.class, job.getOutputKeyClass()); - assertEquals(Text.class, job.getOutputValueClass()); - assertNull(job.getCombinerClass()); - assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); - } - - @Test - public void testInitTableMapperJob2() throws Exception { - Configuration configuration = new Configuration(); - Job job = new Job(configuration, "tableName"); - TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), - Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class); - assertEquals(WALInputFormat.class, job.getInputFormatClass()); - assertEquals(Import.Importer.class, job.getMapperClass()); - assertEquals(LongWritable.class, job.getOutputKeyClass()); - assertEquals(Text.class, job.getOutputValueClass()); - assertNull(job.getCombinerClass()); - assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); - } - - @Test - public void testInitTableMapperJob3() throws Exception { - Configuration configuration = new Configuration(); - Job job = new Job(configuration, "tableName"); - TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), - Import.Importer.class, Text.class, Text.class, job); - assertEquals(TableInputFormat.class, job.getInputFormatClass()); - assertEquals(Import.Importer.class, job.getMapperClass()); - assertEquals(LongWritable.class, job.getOutputKeyClass()); - assertEquals(Text.class, job.getOutputValueClass()); - assertNull(job.getCombinerClass()); - assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); - } - - @Test - public void testInitTableMapperJob4() throws Exception { - Configuration configuration = new Configuration(); - Job job = new Job(configuration, "tableName"); - TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), - Import.Importer.class, Text.class, Text.class, job, false); - assertEquals(TableInputFormat.class, job.getInputFormatClass()); - assertEquals(Import.Importer.class, job.getMapperClass()); - assertEquals(LongWritable.class, job.getOutputKeyClass()); - assertEquals(Text.class, job.getOutputValueClass()); - assertNull(job.getCombinerClass()); - assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java deleted file mode 100644 index 5e63082..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CategoryBasedTimeout; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.junit.After; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.junit.rules.TestRule; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; - -import java.util.Arrays; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; -import org.apache.hadoop.hbase.util.FSUtils; - -@Category({VerySlowMapReduceTests.class, LargeTests.class}) -public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { - private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class); - @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). - withTimeout(this.getClass()).withLookingForStuckThread(true).build(); - - private static final byte[] bbb = Bytes.toBytes("bbb"); - private static final byte[] yyy = Bytes.toBytes("yyy"); - - @Rule - public TestName name = new TestName(); - - @Override - protected byte[] getStartRow() { - return bbb; - } - - @Override - protected byte[] getEndRow() { - return yyy; - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testGetBestLocations() throws IOException { - TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl(); - Configuration conf = UTIL.getConfiguration(); - - HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); - Assert.assertEquals(Lists.newArrayList(), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - - blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); - Assert.assertEquals(Lists.newArrayList("h1"), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - - blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); - Assert.assertEquals(Lists.newArrayList("h1"), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - - blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1); - Assert.assertEquals(Lists.newArrayList("h1"), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - - blockDistribution = new HDFSBlocksDistribution(); - blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10); - blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7); - blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5); - blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1); - Assert.assertEquals(Lists.newArrayList("h1"), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - - blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2); - Assert.assertEquals(Lists.newArrayList("h1", "h2"), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - - blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3); - Assert.assertEquals(Lists.newArrayList("h2", "h1"), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - - blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6); - blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9); - - Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), - TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); - } - - public static enum TestTableSnapshotCounters { - VALIDATION_ERROR - } - - public static class TestTableSnapshotMapper - extends TableMapper<ImmutableBytesWritable, NullWritable> { - @Override - protected void map(ImmutableBytesWritable key, Result value, - Context context) throws IOException, InterruptedException { - // Validate a single row coming from the snapshot, and emit the row key - verifyRowFromMap(key, value); - context.write(key, NullWritable.get()); - } - } - - public static class TestTableSnapshotReducer - extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { - HBaseTestingUtility.SeenRowTracker rowTracker = - new HBaseTestingUtility.SeenRowTracker(bbb, yyy); - @Override - protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values, - Context context) throws IOException, InterruptedException { - rowTracker.addRow(key.get()); - } - - @Override - protected void cleanup(Context context) throws IOException, - InterruptedException { - rowTracker.validate(); - } - } - - @Test - public void testInitTableSnapshotMapperJobConfig() throws Exception { - setupCluster(); - final TableName tableName = TableName.valueOf(name.getMethodName()); - String snapshotName = "foo"; - - try { - createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); - Job job = new Job(UTIL.getConfiguration()); - Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); - - TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, - new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, job, false, tmpTableDir); - - // TODO: would be better to examine directly the cache instance that results from this - // config. Currently this is not possible because BlockCache initialization is static. - Assert.assertEquals( - "Snapshot job should be configured for default LruBlockCache.", - HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, - job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); - Assert.assertEquals( - "Snapshot job should not use BucketCache.", - 0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01); - } finally { - UTIL.getAdmin().deleteSnapshot(snapshotName); - UTIL.deleteTable(tableName); - tearDownCluster(); - } - } - - @Override - public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, - String snapshotName, Path tmpTableDir) throws Exception { - Job job = new Job(UTIL.getConfiguration()); - TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, - new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, job, false, tmpTableDir); - } - - @Override - public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, - int numRegions, int expectedNumSplits) throws Exception { - setupCluster(); - final TableName tableName = TableName.valueOf(name.getMethodName()); - try { - createTableAndSnapshot( - util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); - - Job job = new Job(util.getConfiguration()); - Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); - Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan - - TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, - scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, job, false, tmpTableDir); - - verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); - - } finally { - util.getAdmin().deleteSnapshot(snapshotName); - util.deleteTable(tableName); - tearDownCluster(); - } - } - - public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName, - int expectedRegionSize) throws Exception { - for (int i = 0; i < 100; i++) { - List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName); - if (hRegionInfoList.size() >= expectedRegionSize) { - break; - } - Thread.sleep(1000); - } - } - - @Test - public void testNoDuplicateResultsWhenSplitting() throws Exception { - setupCluster(); - TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); - String snapshotName = "testSnapshotBug"; - try { - if (UTIL.getAdmin().tableExists(tableName)) { - UTIL.deleteTable(tableName); - } - - UTIL.createTable(tableName, FAMILIES); - Admin admin = UTIL.getAdmin(); - - // put some stuff in the table - Table table = UTIL.getConnection().getTable(tableName); - UTIL.loadTable(table, FAMILIES); - - // split to 2 regions - admin.split(tableName, Bytes.toBytes("eee")); - blockUntilSplitFinished(UTIL, tableName, 2); - - Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration()); - FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); - - SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), - null, snapshotName, rootDir, fs, true); - - // load different values - byte[] value = Bytes.toBytes("after_snapshot_value"); - UTIL.loadTable(table, FAMILIES, value); - - // cause flush to create new files in the region - admin.flush(tableName); - table.close(); - - Job job = new Job(UTIL.getConfiguration()); - Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); - // limit the scan - Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); - - TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, - TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, - tmpTableDir); - - verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow()); - } finally { - UTIL.getAdmin().deleteSnapshot(snapshotName); - UTIL.deleteTable(tableName); - tearDownCluster(); - } - } - - private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, - byte[] startRow, byte[] stopRow) - throws IOException, InterruptedException { - TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); - List<InputSplit> splits = tsif.getSplits(job); - - Assert.assertEquals(expectedNumSplits, splits.size()); - - HBaseTestingUtility.SeenRowTracker rowTracker = - new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); - - for (int i = 0; i < splits.size(); i++) { - // validate input split - InputSplit split = splits.get(i); - Assert.assertTrue(split instanceof TableSnapshotRegionSplit); - - // validate record reader - TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); - when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); - RecordReader<ImmutableBytesWritable, Result> rr = - tsif.createRecordReader(split, taskAttemptContext); - rr.initialize(split, taskAttemptContext); - - // validate we can read all the data back - while (rr.nextKeyValue()) { - byte[] row = rr.getCurrentKey().get(); - verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); - rowTracker.addRow(row); - } - - rr.close(); - } - - // validate all rows are seen - rowTracker.validate(); - } - - @Override - protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, - String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, - boolean shutdownCluster) throws Exception { - doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, - numRegions, expectedNumSplits, shutdownCluster); - } - - // this is also called by the IntegrationTestTableSnapshotInputFormat - public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, - String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, - int expectedNumSplits, boolean shutdownCluster) throws Exception { - - LOG.info("testing with MapReduce"); - - LOG.info("create the table and snapshot"); - createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); - - if (shutdownCluster) { - LOG.info("shutting down hbase cluster."); - util.shutdownMiniHBaseCluster(); - } - - try { - // create the job - Job job = new Job(util.getConfiguration()); - Scan scan = new Scan(startRow, endRow); // limit the scan - - job.setJarByClass(util.getClass()); - TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), - TestTableSnapshotInputFormat.class); - - TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, - scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, - NullWritable.class, job, true, tableDir); - - job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); - job.setNumReduceTasks(1); - job.setOutputFormatClass(NullOutputFormat.class); - - Assert.assertTrue(job.waitForCompletion(true)); - } finally { - if (!shutdownCluster) { - util.getAdmin().deleteSnapshot(snapshotName); - util.deleteTable(tableName); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java deleted file mode 100644 index 4382c9c..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.util.ReflectionUtils; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -import java.util.HashSet; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@Category({MapReduceTests.class, SmallTests.class}) -public class TestTableSplit { - @Rule - public TestName name = new TestName(); - - @Test - public void testHashCode() { - TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()), - "row-start".getBytes(), - "row-end".getBytes(), "location"); - TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()), - "row-start".getBytes(), - "row-end".getBytes(), "location"); - assertEquals (split1, split2); - assertTrue (split1.hashCode() == split2.hashCode()); - HashSet<TableSplit> set = new HashSet<>(2); - set.add(split1); - set.add(split2); - assertTrue(set.size() == 1); - } - - /** - * length of region should not influence hashcode - * */ - @Test - public void testHashCode_length() { - TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()), - "row-start".getBytes(), - "row-end".getBytes(), "location", 1984); - TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()), - "row-start".getBytes(), - "row-end".getBytes(), "location", 1982); - - assertEquals (split1, split2); - assertTrue (split1.hashCode() == split2.hashCode()); - HashSet<TableSplit> set = new HashSet<>(2); - set.add(split1); - set.add(split2); - assertTrue(set.size() == 1); - } - - /** - * Length of region need to be properly serialized. - * */ - @Test - public void testLengthIsSerialized() throws Exception { - TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()), - "row-start".getBytes(), - "row-end".getBytes(), "location", 666); - - TableSplit deserialized = new TableSplit(TableName.valueOf(name.getMethodName()), - "row-start2".getBytes(), - "row-end2".getBytes(), "location1"); - ReflectionUtils.copy(new Configuration(), split1, deserialized); - - Assert.assertEquals(666, deserialized.getLength()); - } - - @Test - public void testToString() { - TableSplit split = - new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(), - "location"); - String str = - "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, " - + "end row: row-end, region location: location, " - + "encoded region name: )"; - Assert.assertEquals(str, split.toString()); - - split = - new TableSplit(TableName.valueOf(name.getMethodName()), null, "row-start".getBytes(), - "row-end".getBytes(), "location", "encoded-region-name", 1000L); - str = - "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, " - + "end row: row-end, region location: location, " - + "encoded region name: encoded-region-name)"; - Assert.assertEquals(str, split.toString()); - - split = new TableSplit((TableName) null, null, null, null); - str = - "HBase table split(table name: null, scan: , start row: null, " - + "end row: null, region location: null, " - + "encoded region name: )"; - Assert.assertEquals(str, split.toString()); - - split = new TableSplit((TableName) null, null, null, null, null, null, 1000L); - str = - "HBase table split(table name: null, scan: , start row: null, " - + "end row: null, region location: null, " - + "encoded region name: null)"; - Assert.assertEquals(str, split.toString()); - } -} - http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java deleted file mode 100644 index 6796c94..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - -@Category({MapReduceTests.class, LargeTests.class}) -public class TestTimeRangeMapRed { - private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class); - private static final HBaseTestingUtility UTIL = - new HBaseTestingUtility(); - private Admin admin; - - private static final byte [] KEY = Bytes.toBytes("row1"); - private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>(); - static { - TIMESTAMP.put((long)1245620000, false); - TIMESTAMP.put((long)1245620005, true); // include - TIMESTAMP.put((long)1245620010, true); // include - TIMESTAMP.put((long)1245620055, true); // include - TIMESTAMP.put((long)1245620100, true); // include - TIMESTAMP.put((long)1245620150, false); - TIMESTAMP.put((long)1245620250, false); - } - static final long MINSTAMP = 1245620005; - static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it. - - static final TableName TABLE_NAME = TableName.valueOf("table123"); - static final byte[] FAMILY_NAME = Bytes.toBytes("text"); - static final byte[] COLUMN_NAME = Bytes.toBytes("input"); - - @BeforeClass - public static void beforeClass() throws Exception { - UTIL.startMiniCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Before - public void before() throws Exception { - this.admin = UTIL.getAdmin(); - } - - private static class ProcessTimeRangeMapper - extends TableMapper<ImmutableBytesWritable, MapWritable> - implements Configurable { - - private Configuration conf = null; - private Table table = null; - - @Override - public void map(ImmutableBytesWritable key, Result result, - Context context) - throws IOException { - List<Long> tsList = new ArrayList<>(); - for (Cell kv : result.listCells()) { - tsList.add(kv.getTimestamp()); - } - - List<Put> puts = new ArrayList<>(); - for (Long ts : tsList) { - Put put = new Put(key.get()); - put.setDurability(Durability.SKIP_WAL); - put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true)); - puts.add(put); - } - table.put(puts); - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - try { - Connection connection = ConnectionFactory.createConnection(conf); - table = connection.getTable(TABLE_NAME); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - @Test - public void testTimeRangeMapRed() - throws IOException, InterruptedException, ClassNotFoundException { - final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); - final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME); - col.setMaxVersions(Integer.MAX_VALUE); - desc.addFamily(col); - admin.createTable(desc); - List<Put> puts = new ArrayList<>(); - for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) { - Put put = new Put(KEY); - put.setDurability(Durability.SKIP_WAL); - put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false)); - puts.add(put); - } - Table table = UTIL.getConnection().getTable(desc.getTableName()); - table.put(puts); - runTestOnTable(); - verify(table); - table.close(); - } - - private void runTestOnTable() - throws IOException, InterruptedException, ClassNotFoundException { - Job job = null; - try { - job = new Job(UTIL.getConfiguration(), "test123"); - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); - Scan scan = new Scan(); - scan.addColumn(FAMILY_NAME, COLUMN_NAME); - scan.setTimeRange(MINSTAMP, MAXSTAMP); - scan.setMaxVersions(); - TableMapReduceUtil.initTableMapperJob(TABLE_NAME, - scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job); - job.waitForCompletion(true); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - if (job != null) { - FileUtil.fullyDelete( - new File(job.getConfiguration().get("hadoop.tmp.dir"))); - } - } - } - - private void verify(final Table table) throws IOException { - Scan scan = new Scan(); - scan.addColumn(FAMILY_NAME, COLUMN_NAME); - scan.setMaxVersions(1); - ResultScanner scanner = table.getScanner(scan); - for (Result r: scanner) { - for (Cell kv : r.listCells()) { - log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv)) - + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv)) - + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv))); - org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()), - Bytes.toBoolean(CellUtil.cloneValue(kv))); - } - } - scanner.close(); - } - -} - http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java deleted file mode 100644 index 427c5cc..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.util.ArrayList; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.LauncherSecurityManager; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.util.ToolRunner; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -/** - * Basic test for the WALPlayer M/R tool - */ -@Category({MapReduceTests.class, LargeTests.class}) -public class TestWALPlayer { - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static MiniHBaseCluster cluster; - private static Path rootDir; - private static Path walRootDir; - private static FileSystem fs; - private static FileSystem logFs; - private static Configuration conf; - - @Rule - public TestName name = new TestName(); - - @BeforeClass - public static void beforeClass() throws Exception { - conf= TEST_UTIL.getConfiguration(); - rootDir = TEST_UTIL.createRootDir(); - walRootDir = TEST_UTIL.createWALRootDir(); - fs = FSUtils.getRootDirFileSystem(conf); - logFs = FSUtils.getWALFileSystem(conf); - cluster = TEST_UTIL.startMiniCluster(); - } - - @AfterClass - public static void afterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - fs.delete(rootDir, true); - logFs.delete(walRootDir, true); - } - - /** - * Simple end-to-end test - * @throws Exception - */ - @Test - public void testWALPlayer() throws Exception { - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); - final byte[] FAMILY = Bytes.toBytes("family"); - final byte[] COLUMN1 = Bytes.toBytes("c1"); - final byte[] COLUMN2 = Bytes.toBytes("c2"); - final byte[] ROW = Bytes.toBytes("row"); - Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); - Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); - - // put a row into the first table - Put p = new Put(ROW); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - p.addColumn(FAMILY, COLUMN2, COLUMN2); - t1.put(p); - // delete one column - Delete d = new Delete(ROW); - d.addColumns(FAMILY, COLUMN1); - t1.delete(d); - - // replay the WAL, map table 1 to table 2 - WAL log = cluster.getRegionServer(0).getWAL(null); - log.rollWriter(); - String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() - .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); - - Configuration configuration= TEST_UTIL.getConfiguration(); - WALPlayer player = new WALPlayer(configuration); - String optionName="_test_.name"; - configuration.set(optionName, "1000"); - player.setupTime(configuration, optionName); - assertEquals(1000,configuration.getLong(optionName,0)); - assertEquals(0, ToolRunner.run(configuration, player, - new String[] {walInputDir, tableName1.getNameAsString(), - tableName2.getNameAsString() })); - - - // verify the WAL was player into table 2 - Get g = new Get(ROW); - Result r = t2.get(g); - assertEquals(1, r.size()); - assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); - } - - /** - * Test WALKeyValueMapper setup and map - */ - @Test - public void testWALKeyValueMapper() throws Exception { - testWALKeyValueMapper(WALPlayer.TABLES_KEY); - } - - @Test - public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { - testWALKeyValueMapper("hlog.input.tables"); - } - - private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { - Configuration configuration = new Configuration(); - configuration.set(tableConfigKey, "table"); - WALKeyValueMapper mapper = new WALKeyValueMapper(); - WALKey key = mock(WALKey.class); - when(key.getTablename()).thenReturn(TableName.valueOf("table")); - @SuppressWarnings("unchecked") - Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class); - when(context.getConfiguration()).thenReturn(configuration); - - WALEdit value = mock(WALEdit.class); - ArrayList<Cell> values = new ArrayList<>(); - KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null); - - values.add(kv1); - when(value.getCells()).thenReturn(values); - mapper.setup(context); - - doAnswer(new Answer<Void>() { - - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; - KeyValue key = (KeyValue) invocation.getArguments()[1]; - assertEquals("row", Bytes.toString(writer.get())); - assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); - return null; - } - }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); - - mapper.map(key, value, context); - - } - - /** - * Test main method - */ - @Test - public void testMainMethod() throws Exception { - - PrintStream oldPrintStream = System.err; - SecurityManager SECURITY_MANAGER = System.getSecurityManager(); - LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); - System.setSecurityManager(newSecurityManager); - ByteArrayOutputStream data = new ByteArrayOutputStream(); - String[] args = {}; - System.setErr(new PrintStream(data)); - try { - System.setErr(new PrintStream(data)); - try { - WALPlayer.main(args); - fail("should be SecurityException"); - } catch (SecurityException e) { - assertEquals(-1, newSecurityManager.getExitCode()); - assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); - assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" + - " <tables> [<tableMappings>]")); - assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); - } - - } finally { - System.setErr(oldPrintStream); - System.setSecurityManager(SECURITY_MANAGER); - } - - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java deleted file mode 100644 index 34725b4..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; -import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.testclassification.MapReduceTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.MapReduceTestUtil; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -/** - * JUnit tests for the WALRecordReader - */ -@Category({MapReduceTests.class, MediumTests.class}) -public class TestWALRecordReader { - private static final Log LOG = LogFactory.getLog(TestWALRecordReader.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static Configuration conf; - private static FileSystem fs; - private static Path hbaseDir; - private static FileSystem walFs; - private static Path walRootDir; - // visible for TestHLogRecordReader - static final TableName tableName = TableName.valueOf(getName()); - private static final byte [] rowName = tableName.getName(); - // visible for TestHLogRecordReader - static final HRegionInfo info = new HRegionInfo(tableName, - Bytes.toBytes(""), Bytes.toBytes(""), false); - private static final byte [] family = Bytes.toBytes("column"); - private static final byte [] value = Bytes.toBytes("value"); - private static HTableDescriptor htd; - private static Path logDir; - protected MultiVersionConcurrencyControl mvcc; - protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - - private static String getName() { - return "TestWALRecordReader"; - } - - @Before - public void setUp() throws Exception { - fs.delete(hbaseDir, true); - walFs.delete(walRootDir, true); - mvcc = new MultiVersionConcurrencyControl(); - } - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Make block sizes small. - conf = TEST_UTIL.getConfiguration(); - conf.setInt("dfs.blocksize", 1024 * 1024); - conf.setInt("dfs.replication", 1); - TEST_UTIL.startMiniDFSCluster(1); - - conf = TEST_UTIL.getConfiguration(); - fs = TEST_UTIL.getDFSCluster().getFileSystem(); - - hbaseDir = TEST_UTIL.createRootDir(); - walRootDir = TEST_UTIL.createWALRootDir(); - walFs = FSUtils.getWALFileSystem(conf); - logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); - - htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(family)); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - fs.delete(hbaseDir, true); - walFs.delete(walRootDir, true); - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Test partial reads from the log based on passed time range - * @throws Exception - */ - @Test - public void testPartialRead() throws Exception { - final WALFactory walfactory = new WALFactory(conf, null, getName()); - WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); - // This test depends on timestamp being millisecond based and the filename of the WAL also - // being millisecond based. - long ts = System.currentTimeMillis(); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, getWalKey(ts, scopes), edit, true); - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, getWalKey(ts+1, scopes), edit, true); - log.sync(); - LOG.info("Before 1st WAL roll " + log.toString()); - log.rollWriter(); - LOG.info("Past 1st WAL roll " + log.toString()); - - Thread.sleep(1); - long ts1 = System.currentTimeMillis(); - - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, getWalKey(ts1+1, scopes), edit, true); - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, getWalKey(ts1+2, scopes), edit, true); - log.sync(); - log.shutdown(); - walfactory.shutdown(); - LOG.info("Closed WAL " + log.toString()); - - - WALInputFormat input = new WALInputFormat(); - Configuration jobConf = new Configuration(conf); - jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); - jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); - - // only 1st file is considered, and only its 1st entry is used - List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - - assertEquals(1, splits.size()); - testSplit(splits.get(0), Bytes.toBytes("1")); - - jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1); - jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1); - splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - // both files need to be considered - assertEquals(2, splits.size()); - // only the 2nd entry from the 1st file is used - testSplit(splits.get(0), Bytes.toBytes("2")); - // only the 1nd entry from the 2nd file is used - testSplit(splits.get(1), Bytes.toBytes("3")); - } - - /** - * Test basic functionality - * @throws Exception - */ - @Test - public void testWALRecordReader() throws Exception { - final WALFactory walfactory = new WALFactory(conf, null, getName()); - WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); - byte [] value = Bytes.toBytes("value"); - final AtomicLong sequenceId = new AtomicLong(0); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), - System.currentTimeMillis(), value)); - long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); - log.sync(txid); - - Thread.sleep(1); // make sure 2nd log gets a later timestamp - long secondTs = System.currentTimeMillis(); - log.rollWriter(); - - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - System.currentTimeMillis(), value)); - txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); - log.sync(txid); - log.shutdown(); - walfactory.shutdown(); - long thirdTs = System.currentTimeMillis(); - - // should have 2 log files now - WALInputFormat input = new WALInputFormat(); - Configuration jobConf = new Configuration(conf); - jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); - - // make sure both logs are found - List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - assertEquals(2, splits.size()); - - // should return exactly one KV - testSplit(splits.get(0), Bytes.toBytes("1")); - // same for the 2nd split - testSplit(splits.get(1), Bytes.toBytes("2")); - - // now test basic time ranges: - - // set an endtime, the 2nd log file can be ignored completely. - jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1); - splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - assertEquals(1, splits.size()); - testSplit(splits.get(0), Bytes.toBytes("1")); - - // now set a start time - jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); - jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); - splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - // both logs need to be considered - assertEquals(2, splits.size()); - // but both readers skip all edits - testSplit(splits.get(0)); - testSplit(splits.get(1)); - } - - protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) { - return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); - } - - protected WALRecordReader getReader() { - return new WALKeyRecordReader(); - } - - /** - * Create a new reader from the split, and match the edits against the passed columns. - */ - private void testSplit(InputSplit split, byte[]... columns) throws Exception { - final WALRecordReader reader = getReader(); - reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); - - for (byte[] column : columns) { - assertTrue(reader.nextKeyValue()); - Cell cell = reader.getCurrentValue().getCells().get(0); - if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength())) { - assertTrue( - "expected [" - + Bytes.toString(column) - + "], actual [" - + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength()) + "]", false); - } - } - assertFalse(reader.nextKeyValue()); - reader.close(); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java deleted file mode 100644 index aea5036..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.KeyValue; - -import java.io.IOException; - -/** - * Dummy mapper used for unit tests to verify that the mapper can be injected. - * This approach would be used if a custom transformation needed to be done after - * reading the input data before writing it to HFiles. - */ -public class TsvImporterCustomTestMapper extends TsvImporterMapper { - - @Override - protected void setup(Context context) { - doSetup(context); - } - - /** - * Convert a line of TSV text into an HBase table row after transforming the - * values by multiplying them by 3. - */ - @Override - public void map(LongWritable offset, Text value, Context context) - throws IOException { - byte[] family = Bytes.toBytes("FAM"); - final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") }; - - // do some basic line parsing - byte[] lineBytes = value.getBytes(); - String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b"); - - // create the rowKey and Put - ImmutableBytesWritable rowKey = - new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0])); - Put put = new Put(rowKey.copyBytes()); - put.setDurability(Durability.SKIP_WAL); - - //The value should look like this: VALUE1 or VALUE2. Let's multiply - //the integer by 3 - for(int i = 1; i < valueTokens.length; i++) { - String prefix = valueTokens[i].substring(0, "VALUE".length()); - String suffix = valueTokens[i].substring("VALUE".length()); - String newValue = prefix + Integer.parseInt(suffix) * 3; - - KeyValue kv = new KeyValue(rowKey.copyBytes(), family, - qualifiers[i-1], Bytes.toBytes(newValue)); - put.add(kv); - } - - try { - context.write(rowKey, put); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -}
