http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java new file mode 100644 index 0000000..13b6a96 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java @@ -0,0 +1,287 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; + + +/** + * <p> + * Tests various scan start and stop row scenarios. This is set in a scan and + * tested in a MapReduce job to see if that is handed over and done properly + * too. + * </p> + * <p> + * This test is broken into two parts in order to side-step the test timeout + * period of 900, as documented in HBASE-8326. + * </p> + */ +public abstract class TestTableInputFormatScanBase { + + private static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class); + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + static final TableName TABLE_NAME = TableName.valueOf("scantest"); + static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")}; + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + private static Table table = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. + // this turns it off for this test. TODO: Figure out why scr breaks recovery. + System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); + + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(TableInputFormat.class); + TEST_UTIL.enableDebug(TableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS); + TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Pass the key and value to reduce. + */ + public static class ScanMapper + extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { + + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, + Context context) + throws IOException, InterruptedException { + if (value.size() != 2) { + throw new IOException("There should be two input columns"); + } + Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> + cfMap = value.getMap(); + + if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'."); + } + + String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null)); + String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null)); + LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> (" + val0 + ", " + val1 + ")"); + context.write(key, key); + } + } + + /** + * Checks the last and first key seen against the scanner boundaries. + */ + public static class ScanReducer + extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, + NullWritable, NullWritable> { + + private String first = null; + private String last = null; + + protected void reduce(ImmutableBytesWritable key, + Iterable<ImmutableBytesWritable> values, Context context) + throws IOException ,InterruptedException { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.info("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + } + + protected void cleanup(Context context) + throws IOException, InterruptedException { + Configuration c = context.getConfiguration(); + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + + } + + /** + * Tests an MR Scan initialized from properties set in the Configuration. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScanFromConfiguration(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + + "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString()); + c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILYS[0]) + ", " + + Bytes.toString(INPUT_FAMILYS[1])); + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + if (start != null) { + c.set(TableInputFormat.SCAN_ROW_START, start); + } + + if (stop != null) { + c.set(TableInputFormat.SCAN_ROW_STOP, stop); + } + + Job job = new Job(c, jobName); + job.setMapperClass(ScanMapper.class); + job.setReducerClass(ScanReducer.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(ImmutableBytesWritable.class); + job.setInputFormatClass(TableInputFormat.class); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + TableMapReduceUtil.addDependencyJars(job); + assertTrue(job.waitForCompletion(true)); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + protected void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + + "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILYS[0]); + scan.addFamily(INPUT_FAMILYS[1]); + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + c.set(KEY_STARTROW, start != null ? start : ""); + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + c.set(KEY_LASTROW, last != null ? last : ""); + LOG.info("scan before: " + scan); + Job job = new Job(c, jobName); + TableMapReduceUtil.initTableMapperJob( + TABLE_NAME, scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion - job " + jobName); + } + + + /** + * Tests a MR scan using data skew auto-balance + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException, + InterruptedException, + ClassNotFoundException { + String jobName = "TestJobForNumOfSplits"; + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILYS[0]); + scan.addFamily(INPUT_FAMILYS[1]); + c.set("hbase.mapreduce.input.autobalance", "true"); + c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio); + c.set(KEY_STARTROW, ""); + c.set(KEY_LASTROW, ""); + Job job = new Job(c, jobName); + TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); + TableInputFormat tif = new TableInputFormat(); + tif.setConf(job.getConfiguration()); + Assert.assertEquals(TABLE_NAME, table.getName()); + List<InputSplit> splits = tif.getSplits(job); + Assert.assertEquals(expectedNumOfSplits, splits.size()); + } + + /** + * Tests for the getSplitKey() method in TableInputFormatBase.java + */ + public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) { + byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText); + Assert.assertArrayEquals(splitKey, result); + } +} +
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java new file mode 100644 index 0000000..d702e0d --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -0,0 +1,174 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of + * a particular cell, and write it back to the table. + */ + +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestTableMapReduce extends TestTableMapReduceBase { + private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); + + @Override + protected Log getLog() { return LOG; } + + /** + * Pass the given key and processed record reduce + */ + static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> { + + /** + * Pass the key, and reversed value to reduce + * + * @param key + * @param value + * @param context + * @throws IOException + */ + @Override + public void map(ImmutableBytesWritable key, Result value, + Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + + // Get the original value and reverse it + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); + StringBuilder newValue = new StringBuilder(originalValue); + newValue.reverse(); + // Now set the value to be collected + Put outval = new Put(key.get()); + outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); + context.write(key, outval); + } + } + + @Override + protected void runTestOnTable(Table table) throws IOException { + Job job = null; + try { + LOG.info("Before map/reduce startup"); + job = new Job(table.getConfiguration(), "process column contents"); + job.setNumReduceTasks(1); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + TableMapReduceUtil.initTableMapperJob( + table.getName().getNameAsString(), scan, + ProcessContentsMapper.class, ImmutableBytesWritable.class, + Put.class, job); + TableMapReduceUtil.initTableReducerJob( + table.getName().getNameAsString(), + IdentityTableReducer.class, job); + FileOutputFormat.setOutputPath(job, new Path("test")); + LOG.info("Started " + table.getName().getNameAsString()); + assertTrue(job.waitForCompletion(true)); + LOG.info("After map/reduce completion"); + + // verify map-reduce results + verify(table.getName()); + + verifyJobCountersAreEmitted(job); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + table.close(); + if (job != null) { + FileUtil.fullyDelete( + new File(job.getConfiguration().get("hadoop.tmp.dir"))); + } + } + } + + /** + * Verify scan counters are emitted from the job + * @param job + * @throws IOException + */ + private void verifyJobCountersAreEmitted(Job job) throws IOException { + Counters counters = job.getCounters(); + Counter counter + = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS"); + assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter); + assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0); + } + + @Test(expected = TableNotEnabledException.class) + public void testWritingToDisabledTable() throws IOException { + + try (Admin admin = UTIL.getConnection().getAdmin(); + Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) { + admin.disableTable(table.getName()); + runTestOnTable(table); + fail("Should not have reached here, should have thrown an exception"); + } + } + + @Test(expected = TableNotFoundException.class) + public void testWritingToNonExistentTable() throws IOException { + + try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) { + runTestOnTable(table); + fail("Should not have reached here, should have thrown an exception"); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java new file mode 100644 index 0000000..27bf063 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java @@ -0,0 +1,233 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +/** + * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of a particular cell, + * and write it back to the table. Implements common components between mapred and mapreduce + * implementations. + */ +public abstract class TestTableMapReduceBase { + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest"); + protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable"); + protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); + + protected static final byte[][] columns = new byte[][] { + INPUT_FAMILY, + OUTPUT_FAMILY + }; + + /** + * Retrieve my logger instance. + */ + protected abstract Log getLog(); + + /** + * Handles API-specifics for setting up and executing the job. + */ + protected abstract void runTestOnTable(Table table) throws IOException; + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + Table table = + UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY, + OUTPUT_FAMILY }); + UTIL.loadTable(table, INPUT_FAMILY, false); + UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY }); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS); + UTIL.shutdownMiniCluster(); + } + + /** + * Test a map/reduce against a multi-region table + * @throws IOException + */ + @Test + public void testMultiRegionTable() throws IOException { + runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); + } + + @Test + public void testCombiner() throws IOException { + Configuration conf = new Configuration(UTIL.getConfiguration()); + // force use of combiner for testing purposes + conf.setInt("mapreduce.map.combine.minspills", 1); + runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME)); + } + + /** + * Implements mapper logic for use across APIs. + */ + protected static Put map(ImmutableBytesWritable key, Result value) throws IOException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + + // Get the original value and reverse it + + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); + StringBuilder newValue = new StringBuilder(originalValue); + newValue.reverse(); + + // Now set the value to be collected + + Put outval = new Put(key.get()); + outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); + return outval; + } + + protected void verify(TableName tableName) throws IOException { + Table table = UTIL.getConnection().getTable(tableName); + boolean verified = false; + long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); + int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); + for (int i = 0; i < numRetries; i++) { + try { + getLog().info("Verification attempt #" + i); + verifyAttempt(table); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + getLog().debug("Verification attempt failed: " + e.getMessage()); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + } + + /** + * Looks at every value of the mapreduce output and verifies that indeed + * the values have been reversed. + * @param table Table to scan. + * @throws IOException + * @throws NullPointerException if we failed to find a cell value + */ + private void verifyAttempt(final Table table) throws IOException, NullPointerException { + Scan scan = new Scan(); + TableInputFormat.addColumns(scan, columns); + ResultScanner scanner = table.getScanner(scan); + try { + Iterator<Result> itr = scanner.iterator(); + assertTrue(itr.hasNext()); + while(itr.hasNext()) { + Result r = itr.next(); + if (getLog().isDebugEnabled()) { + if (r.size() > 2 ) { + throw new IOException("Too many results, expected 2 got " + + r.size()); + } + } + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + for(Cell kv : r.listCells()) { + if (count == 0) { + firstValue = CellUtil.cloneValue(kv); + } + if (count == 1) { + secondValue = CellUtil.cloneValue(kv); + } + count++; + if (count == 2) { + break; + } + } + + + if (firstValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": first value is null"); + } + String first = Bytes.toString(firstValue); + + if (secondValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": second value is null"); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + String second = Bytes.toString(secondReversed); + + if (first.compareTo(second) != 0) { + if (getLog().isDebugEnabled()) { + getLog().debug("second key is not the reverse of first. row=" + + Bytes.toStringBinary(r.getRow()) + ", first value=" + first + + ", second value=" + second); + } + fail(); + } + } + } finally { + scanner.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java new file mode 100644 index 0000000..506bf4f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test different variants of initTableMapperJob method + */ +@Category({MapReduceTests.class, SmallTests.class}) +public class TestTableMapReduceUtil { + + /* + * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because + * the method depends on an online cluster. + */ + + @Test + public void testInitTableMapperJob1() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + // test + TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, + Text.class, job, false, WALInputFormat.class); + assertEquals(WALInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + } + + @Test + public void testInitTableMapperJob2() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class); + assertEquals(WALInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + } + + @Test + public void testInitTableMapperJob3() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job); + assertEquals(TableInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + } + + @Test + public void testInitTableMapperJob4() throws Exception { + Configuration configuration = new Configuration(); + Job job = new Job(configuration, "tableName"); + TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), + Import.Importer.class, Text.class, Text.class, job, false); + assertEquals(TableInputFormat.class, job.getInputFormatClass()); + assertEquals(Import.Importer.class, job.getMapperClass()); + assertEquals(LongWritable.class, job.getOutputKeyClass()); + assertEquals(Text.class, job.getOutputValueClass()); + assertNull(job.getCombinerClass()); + assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java new file mode 100644 index 0000000..028df98 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -0,0 +1,373 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TestTableSnapshotScanner; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + +import java.util.Arrays; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.FSUtils; + +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { + private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class); + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + + private static final byte[] bbb = Bytes.toBytes("bbb"); + private static final byte[] yyy = Bytes.toBytes("yyy"); + + @Rule + public TestName name = new TestName(); + + @Override + protected byte[] getStartRow() { + return bbb; + } + + @Override + protected byte[] getEndRow() { + return yyy; + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testGetBestLocations() throws IOException { + TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl(); + Configuration conf = UTIL.getConfiguration(); + + HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution(); + Assert.assertEquals(Lists.newArrayList(), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + + blockDistribution = new HDFSBlocksDistribution(); + blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10); + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7); + blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5); + blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1); + Assert.assertEquals(Lists.newArrayList("h1"), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2); + Assert.assertEquals(Lists.newArrayList("h1", "h2"), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3); + Assert.assertEquals(Lists.newArrayList("h2", "h1"), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + + blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6); + blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9); + + Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"), + TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution)); + } + + public static enum TestTableSnapshotCounters { + VALIDATION_ERROR + } + + public static class TestTableSnapshotMapper + extends TableMapper<ImmutableBytesWritable, NullWritable> { + @Override + protected void map(ImmutableBytesWritable key, Result value, + Context context) throws IOException, InterruptedException { + // Validate a single row coming from the snapshot, and emit the row key + verifyRowFromMap(key, value); + context.write(key, NullWritable.get()); + } + } + + public static class TestTableSnapshotReducer + extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { + HBaseTestingUtility.SeenRowTracker rowTracker = + new HBaseTestingUtility.SeenRowTracker(bbb, yyy); + @Override + protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values, + Context context) throws IOException, InterruptedException { + rowTracker.addRow(key.get()); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + rowTracker.validate(); + } + } + + @Test + public void testInitTableSnapshotMapperJobConfig() throws Exception { + setupCluster(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + String snapshotName = "foo"; + + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); + Job job = new Job(UTIL.getConfiguration()); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + // TODO: would be better to examine directly the cache instance that results from this + // config. Currently this is not possible because BlockCache initialization is static. + Assert.assertEquals( + "Snapshot job should be configured for default LruBlockCache.", + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, + job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); + Assert.assertEquals( + "Snapshot job should not use BucketCache.", + 0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01); + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + @Override + public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, + String snapshotName, Path tmpTableDir) throws Exception { + Job job = new Job(UTIL.getConfiguration()); + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + } + + @Override + public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits) throws Exception { + setupCluster(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + try { + createTableAndSnapshot( + util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); + + Job job = new Job(util.getConfiguration()); + Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); + Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); + + } finally { + util.getAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + tearDownCluster(); + } + } + + @Test + public void testNoDuplicateResultsWhenSplitting() throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting"); + String snapshotName = "testSnapshotBug"; + try { + if (UTIL.getAdmin().tableExists(tableName)) { + UTIL.deleteTable(tableName); + } + + UTIL.createTable(tableName, FAMILIES); + Admin admin = UTIL.getAdmin(); + + // put some stuff in the table + Table table = UTIL.getConnection().getTable(tableName); + UTIL.loadTable(table, FAMILIES); + + // split to 2 regions + admin.split(tableName, Bytes.toBytes("eee")); + TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2); + + Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration()); + FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration()); + + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), + null, snapshotName, rootDir, fs, true); + + // load different values + byte[] value = Bytes.toBytes("after_snapshot_value"); + UTIL.loadTable(table, FAMILIES, value); + + // cause flush to create new files in the region + admin.flush(tableName); + table.close(); + + Job job = new Job(UTIL.getConfiguration()); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + // limit the scan + Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false, + tmpTableDir); + + verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow()); + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits, + byte[] startRow, byte[] stopRow) + throws IOException, InterruptedException { + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + List<InputSplit> splits = tsif.getSplits(job); + + Assert.assertEquals(expectedNumSplits, splits.size()); + + HBaseTestingUtility.SeenRowTracker rowTracker = + new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + + for (int i = 0; i < splits.size(); i++) { + // validate input split + InputSplit split = splits.get(i); + Assert.assertTrue(split instanceof TableSnapshotRegionSplit); + + // validate record reader + TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class); + when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration()); + RecordReader<ImmutableBytesWritable, Result> rr = + tsif.createRecordReader(split, taskAttemptContext); + rr.initialize(split, taskAttemptContext); + + // validate we can read all the data back + while (rr.nextKeyValue()) { + byte[] row = rr.getCurrentKey().get(); + verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue()); + rowTracker.addRow(row); + } + + rr.close(); + } + + // validate all rows are seen + rowTracker.validate(); + } + + @Override + protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + boolean shutdownCluster) throws Exception { + doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, + numRegions, expectedNumSplits, shutdownCluster); + } + + // this is also called by the IntegrationTestTableSnapshotInputFormat + public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, + int expectedNumSplits, boolean shutdownCluster) throws Exception { + + LOG.info("testing with MapReduce"); + + LOG.info("create the table and snapshot"); + createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); + + if (shutdownCluster) { + LOG.info("shutting down hbase cluster."); + util.shutdownMiniHBaseCluster(); + } + + try { + // create the job + Job job = new Job(util.getConfiguration()); + Scan scan = new Scan(startRow, endRow); // limit the scan + + job.setJarByClass(util.getClass()); + TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), + TestTableSnapshotInputFormat.class); + + TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, + scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, true, tableDir); + + job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); + job.setNumReduceTasks(1); + job.setOutputFormatClass(NullOutputFormat.class); + + Assert.assertTrue(job.waitForCompletion(true)); + } finally { + if (!shutdownCluster) { + util.getAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java new file mode 100644 index 0000000..4382c9c --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestTableSplit { + @Rule + public TestName name = new TestName(); + + @Test + public void testHashCode() { + TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()), + "row-start".getBytes(), + "row-end".getBytes(), "location"); + TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()), + "row-start".getBytes(), + "row-end".getBytes(), "location"); + assertEquals (split1, split2); + assertTrue (split1.hashCode() == split2.hashCode()); + HashSet<TableSplit> set = new HashSet<>(2); + set.add(split1); + set.add(split2); + assertTrue(set.size() == 1); + } + + /** + * length of region should not influence hashcode + * */ + @Test + public void testHashCode_length() { + TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()), + "row-start".getBytes(), + "row-end".getBytes(), "location", 1984); + TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()), + "row-start".getBytes(), + "row-end".getBytes(), "location", 1982); + + assertEquals (split1, split2); + assertTrue (split1.hashCode() == split2.hashCode()); + HashSet<TableSplit> set = new HashSet<>(2); + set.add(split1); + set.add(split2); + assertTrue(set.size() == 1); + } + + /** + * Length of region need to be properly serialized. + * */ + @Test + public void testLengthIsSerialized() throws Exception { + TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()), + "row-start".getBytes(), + "row-end".getBytes(), "location", 666); + + TableSplit deserialized = new TableSplit(TableName.valueOf(name.getMethodName()), + "row-start2".getBytes(), + "row-end2".getBytes(), "location1"); + ReflectionUtils.copy(new Configuration(), split1, deserialized); + + Assert.assertEquals(666, deserialized.getLength()); + } + + @Test + public void testToString() { + TableSplit split = + new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(), + "location"); + String str = + "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, " + + "end row: row-end, region location: location, " + + "encoded region name: )"; + Assert.assertEquals(str, split.toString()); + + split = + new TableSplit(TableName.valueOf(name.getMethodName()), null, "row-start".getBytes(), + "row-end".getBytes(), "location", "encoded-region-name", 1000L); + str = + "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, " + + "end row: row-end, region location: location, " + + "encoded region name: encoded-region-name)"; + Assert.assertEquals(str, split.toString()); + + split = new TableSplit((TableName) null, null, null, null); + str = + "HBase table split(table name: null, scan: , start row: null, " + + "end row: null, region location: null, " + + "encoded region name: )"; + Assert.assertEquals(str, split.toString()); + + split = new TableSplit((TableName) null, null, null, null, null, null, 1000L); + str = + "HBase table split(table name: null, scan: , start row: null, " + + "end row: null, region location: null, " + + "encoded region name: null)"; + Assert.assertEquals(str, split.toString()); + } +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java new file mode 100644 index 0000000..6796c94 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java @@ -0,0 +1,211 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +@Category({MapReduceTests.class, LargeTests.class}) +public class TestTimeRangeMapRed { + private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class); + private static final HBaseTestingUtility UTIL = + new HBaseTestingUtility(); + private Admin admin; + + private static final byte [] KEY = Bytes.toBytes("row1"); + private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>(); + static { + TIMESTAMP.put((long)1245620000, false); + TIMESTAMP.put((long)1245620005, true); // include + TIMESTAMP.put((long)1245620010, true); // include + TIMESTAMP.put((long)1245620055, true); // include + TIMESTAMP.put((long)1245620100, true); // include + TIMESTAMP.put((long)1245620150, false); + TIMESTAMP.put((long)1245620250, false); + } + static final long MINSTAMP = 1245620005; + static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it. + + static final TableName TABLE_NAME = TableName.valueOf("table123"); + static final byte[] FAMILY_NAME = Bytes.toBytes("text"); + static final byte[] COLUMN_NAME = Bytes.toBytes("input"); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws Exception { + this.admin = UTIL.getAdmin(); + } + + private static class ProcessTimeRangeMapper + extends TableMapper<ImmutableBytesWritable, MapWritable> + implements Configurable { + + private Configuration conf = null; + private Table table = null; + + @Override + public void map(ImmutableBytesWritable key, Result result, + Context context) + throws IOException { + List<Long> tsList = new ArrayList<>(); + for (Cell kv : result.listCells()) { + tsList.add(kv.getTimestamp()); + } + + List<Put> puts = new ArrayList<>(); + for (Long ts : tsList) { + Put put = new Put(key.get()); + put.setDurability(Durability.SKIP_WAL); + put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true)); + puts.add(put); + } + table.put(puts); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + try { + Connection connection = ConnectionFactory.createConnection(conf); + table = connection.getTable(TABLE_NAME); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Test + public void testTimeRangeMapRed() + throws IOException, InterruptedException, ClassNotFoundException { + final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME); + col.setMaxVersions(Integer.MAX_VALUE); + desc.addFamily(col); + admin.createTable(desc); + List<Put> puts = new ArrayList<>(); + for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) { + Put put = new Put(KEY); + put.setDurability(Durability.SKIP_WAL); + put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false)); + puts.add(put); + } + Table table = UTIL.getConnection().getTable(desc.getTableName()); + table.put(puts); + runTestOnTable(); + verify(table); + table.close(); + } + + private void runTestOnTable() + throws IOException, InterruptedException, ClassNotFoundException { + Job job = null; + try { + job = new Job(UTIL.getConfiguration(), "test123"); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + Scan scan = new Scan(); + scan.addColumn(FAMILY_NAME, COLUMN_NAME); + scan.setTimeRange(MINSTAMP, MAXSTAMP); + scan.setMaxVersions(); + TableMapReduceUtil.initTableMapperJob(TABLE_NAME, + scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job); + job.waitForCompletion(true); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + if (job != null) { + FileUtil.fullyDelete( + new File(job.getConfiguration().get("hadoop.tmp.dir"))); + } + } + } + + private void verify(final Table table) throws IOException { + Scan scan = new Scan(); + scan.addColumn(FAMILY_NAME, COLUMN_NAME); + scan.setMaxVersions(1); + ResultScanner scanner = table.getScanner(scan); + for (Result r: scanner) { + for (Cell kv : r.listCells()) { + log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv)) + + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv)) + + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv))); + org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()), + Bytes.toBoolean(CellUtil.cloneValue(kv))); + } + } + scanner.close(); + } + +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java new file mode 100644 index 0000000..427c5cc --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Basic test for the WALPlayer M/R tool + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestWALPlayer { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static MiniHBaseCluster cluster; + private static Path rootDir; + private static Path walRootDir; + private static FileSystem fs; + private static FileSystem logFs; + private static Configuration conf; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void beforeClass() throws Exception { + conf= TEST_UTIL.getConfiguration(); + rootDir = TEST_UTIL.createRootDir(); + walRootDir = TEST_UTIL.createWALRootDir(); + fs = FSUtils.getRootDirFileSystem(conf); + logFs = FSUtils.getWALFileSystem(conf); + cluster = TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + fs.delete(rootDir, true); + logFs.delete(walRootDir, true); + } + + /** + * Simple end-to-end test + * @throws Exception + */ + @Test + public void testWALPlayer() throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + final byte[] COLUMN2 = Bytes.toBytes("c2"); + final byte[] ROW = Bytes.toBytes("row"); + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + // put a row into the first table + Put p = new Put(ROW); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + p.addColumn(FAMILY, COLUMN2, COLUMN2); + t1.put(p); + // delete one column + Delete d = new Delete(ROW); + d.addColumns(FAMILY, COLUMN1); + t1.delete(d); + + // replay the WAL, map table 1 to table 2 + WAL log = cluster.getRegionServer(0).getWAL(null); + log.rollWriter(); + String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() + .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); + + Configuration configuration= TEST_UTIL.getConfiguration(); + WALPlayer player = new WALPlayer(configuration); + String optionName="_test_.name"; + configuration.set(optionName, "1000"); + player.setupTime(configuration, optionName); + assertEquals(1000,configuration.getLong(optionName,0)); + assertEquals(0, ToolRunner.run(configuration, player, + new String[] {walInputDir, tableName1.getNameAsString(), + tableName2.getNameAsString() })); + + + // verify the WAL was player into table 2 + Get g = new Get(ROW); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2)); + } + + /** + * Test WALKeyValueMapper setup and map + */ + @Test + public void testWALKeyValueMapper() throws Exception { + testWALKeyValueMapper(WALPlayer.TABLES_KEY); + } + + @Test + public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { + testWALKeyValueMapper("hlog.input.tables"); + } + + private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { + Configuration configuration = new Configuration(); + configuration.set(tableConfigKey, "table"); + WALKeyValueMapper mapper = new WALKeyValueMapper(); + WALKey key = mock(WALKey.class); + when(key.getTablename()).thenReturn(TableName.valueOf("table")); + @SuppressWarnings("unchecked") + Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class); + when(context.getConfiguration()).thenReturn(configuration); + + WALEdit value = mock(WALEdit.class); + ArrayList<Cell> values = new ArrayList<>(); + KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null); + + values.add(kv1); + when(value.getCells()).thenReturn(values); + mapper.setup(context); + + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; + KeyValue key = (KeyValue) invocation.getArguments()[1]; + assertEquals("row", Bytes.toString(writer.get())); + assertEquals("row", Bytes.toString(CellUtil.cloneRow(key))); + return null; + } + }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class)); + + mapper.map(key, value, context); + + } + + /** + * Test main method + */ + @Test + public void testMainMethod() throws Exception { + + PrintStream oldPrintStream = System.err; + SecurityManager SECURITY_MANAGER = System.getSecurityManager(); + LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); + System.setSecurityManager(newSecurityManager); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + try { + WALPlayer.main(args); + fail("should be SecurityException"); + } catch (SecurityException e) { + assertEquals(-1, newSecurityManager.getExitCode()); + assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); + assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" + + " <tables> [<tableMappings>]")); + assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); + } + + } finally { + System.setErr(oldPrintStream); + System.setSecurityManager(SECURITY_MANAGER); + } + + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java new file mode 100644 index 0000000..34725b4 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * JUnit tests for the WALRecordReader + */ +@Category({MapReduceTests.class, MediumTests.class}) +public class TestWALRecordReader { + private static final Log LOG = LogFactory.getLog(TestWALRecordReader.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + private static FileSystem fs; + private static Path hbaseDir; + private static FileSystem walFs; + private static Path walRootDir; + // visible for TestHLogRecordReader + static final TableName tableName = TableName.valueOf(getName()); + private static final byte [] rowName = tableName.getName(); + // visible for TestHLogRecordReader + static final HRegionInfo info = new HRegionInfo(tableName, + Bytes.toBytes(""), Bytes.toBytes(""), false); + private static final byte [] family = Bytes.toBytes("column"); + private static final byte [] value = Bytes.toBytes("value"); + private static HTableDescriptor htd; + private static Path logDir; + protected MultiVersionConcurrencyControl mvcc; + protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + private static String getName() { + return "TestWALRecordReader"; + } + + @Before + public void setUp() throws Exception { + fs.delete(hbaseDir, true); + walFs.delete(walRootDir, true); + mvcc = new MultiVersionConcurrencyControl(); + } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + conf = TEST_UTIL.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.replication", 1); + TEST_UTIL.startMiniDFSCluster(1); + + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + + hbaseDir = TEST_UTIL.createRootDir(); + walRootDir = TEST_UTIL.createWALRootDir(); + walFs = FSUtils.getWALFileSystem(conf); + logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); + + htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + fs.delete(hbaseDir, true); + walFs.delete(walRootDir, true); + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test partial reads from the log based on passed time range + * @throws Exception + */ + @Test + public void testPartialRead() throws Exception { + final WALFactory walfactory = new WALFactory(conf, null, getName()); + WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); + // This test depends on timestamp being millisecond based and the filename of the WAL also + // being millisecond based. + long ts = System.currentTimeMillis(); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); + log.append(info, getWalKey(ts, scopes), edit, true); + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); + log.append(info, getWalKey(ts+1, scopes), edit, true); + log.sync(); + LOG.info("Before 1st WAL roll " + log.toString()); + log.rollWriter(); + LOG.info("Past 1st WAL roll " + log.toString()); + + Thread.sleep(1); + long ts1 = System.currentTimeMillis(); + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); + log.append(info, getWalKey(ts1+1, scopes), edit, true); + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); + log.append(info, getWalKey(ts1+2, scopes), edit, true); + log.sync(); + log.shutdown(); + walfactory.shutdown(); + LOG.info("Closed WAL " + log.toString()); + + + WALInputFormat input = new WALInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); + jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); + + // only 1st file is considered, and only its 1st entry is used + List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + + assertEquals(1, splits.size()); + testSplit(splits.get(0), Bytes.toBytes("1")); + + jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1); + jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + // both files need to be considered + assertEquals(2, splits.size()); + // only the 2nd entry from the 1st file is used + testSplit(splits.get(0), Bytes.toBytes("2")); + // only the 1nd entry from the 2nd file is used + testSplit(splits.get(1), Bytes.toBytes("3")); + } + + /** + * Test basic functionality + * @throws Exception + */ + @Test + public void testWALRecordReader() throws Exception { + final WALFactory walfactory = new WALFactory(conf, null, getName()); + WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); + byte [] value = Bytes.toBytes("value"); + final AtomicLong sequenceId = new AtomicLong(0); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + System.currentTimeMillis(), value)); + long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); + log.sync(txid); + + Thread.sleep(1); // make sure 2nd log gets a later timestamp + long secondTs = System.currentTimeMillis(); + log.rollWriter(); + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + System.currentTimeMillis(), value)); + txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); + log.sync(txid); + log.shutdown(); + walfactory.shutdown(); + long thirdTs = System.currentTimeMillis(); + + // should have 2 log files now + WALInputFormat input = new WALInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); + + // make sure both logs are found + List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + assertEquals(2, splits.size()); + + // should return exactly one KV + testSplit(splits.get(0), Bytes.toBytes("1")); + // same for the 2nd split + testSplit(splits.get(1), Bytes.toBytes("2")); + + // now test basic time ranges: + + // set an endtime, the 2nd log file can be ignored completely. + jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + assertEquals(1, splits.size()); + testSplit(splits.get(0), Bytes.toBytes("1")); + + // now set a start time + jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); + jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + // both logs need to be considered + assertEquals(2, splits.size()); + // but both readers skip all edits + testSplit(splits.get(0)); + testSplit(splits.get(1)); + } + + protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) { + return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); + } + + protected WALRecordReader getReader() { + return new WALKeyRecordReader(); + } + + /** + * Create a new reader from the split, and match the edits against the passed columns. + */ + private void testSplit(InputSplit split, byte[]... columns) throws Exception { + final WALRecordReader reader = getReader(); + reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); + + for (byte[] column : columns) { + assertTrue(reader.nextKeyValue()); + Cell cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(), + cell.getQualifierOffset(), cell.getQualifierLength())) { + assertTrue( + "expected [" + + Bytes.toString(column) + + "], actual [" + + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()) + "]", false); + } + } + assertFalse(reader.nextKeyValue()); + reader.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java new file mode 100644 index 0000000..aea5036 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.KeyValue; + +import java.io.IOException; + +/** + * Dummy mapper used for unit tests to verify that the mapper can be injected. + * This approach would be used if a custom transformation needed to be done after + * reading the input data before writing it to HFiles. + */ +public class TsvImporterCustomTestMapper extends TsvImporterMapper { + + @Override + protected void setup(Context context) { + doSetup(context); + } + + /** + * Convert a line of TSV text into an HBase table row after transforming the + * values by multiplying them by 3. + */ + @Override + public void map(LongWritable offset, Text value, Context context) + throws IOException { + byte[] family = Bytes.toBytes("FAM"); + final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") }; + + // do some basic line parsing + byte[] lineBytes = value.getBytes(); + String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b"); + + // create the rowKey and Put + ImmutableBytesWritable rowKey = + new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0])); + Put put = new Put(rowKey.copyBytes()); + put.setDurability(Durability.SKIP_WAL); + + //The value should look like this: VALUE1 or VALUE2. Let's multiply + //the integer by 3 + for(int i = 1; i < valueTokens.length; i++) { + String prefix = valueTokens[i].substring(0, "VALUE".length()); + String suffix = valueTokens[i].substring("VALUE".length()); + String newValue = prefix + Integer.parseInt(suffix) * 3; + + KeyValue kv = new KeyValue(rowKey.copyBytes(), family, + qualifiers[i-1], Bytes.toBytes(newValue)); + put.add(kv); + } + + try { + context.write(rowKey, put); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +}