http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java new file mode 100644 index 0000000..835117c --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableOutputFormatConnectionExhaust.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +/** + * Spark creates many instances of TableOutputFormat within a single process. We need to make + * sure we can have many instances and not leak connections. + * + * This test creates a few TableOutputFormats and shouldn't fail due to ZK connection exhaustion. + */ +@Category(MediumTests.class) +public class TestTableOutputFormatConnectionExhaust { + + private static final Log LOG = + LogFactory.getLog(TestTableOutputFormatConnectionExhaust.class); + + private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + static final String TABLE = "TestTableOutputFormatConnectionExhaust"; + static final String FAMILY = "family"; + + @BeforeClass + public static void beforeClass() throws Exception { + // Default in ZookeeperMiniCluster is 1000, setting artificially low to trigger exhaustion. + // need min of 7 to properly start the default mini HBase cluster + UTIL.getConfiguration().setInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 10); + UTIL.startMiniCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + LOG.info("before"); + UTIL.ensureSomeRegionServersAvailable(1); + LOG.info("before done"); + } + + /** + * Open and close a TableOutputFormat. The closing the RecordWriter should release HBase + * Connection (ZK) resources, and will throw exception if they are exhausted. + */ + static void openCloseTableOutputFormat(int iter) throws IOException { + LOG.info("Instantiating TableOutputFormat connection " + iter); + JobConf conf = new JobConf(); + conf.addResource(UTIL.getConfiguration()); + conf.set(TableOutputFormat.OUTPUT_TABLE, TABLE); + TableMapReduceUtil.initTableMapJob(TABLE, FAMILY, TableMap.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, conf); + TableOutputFormat tof = new TableOutputFormat(); + RecordWriter rw = tof.getRecordWriter(null, conf, TABLE, null); + rw.close(null); + } + + @Test + public void testConnectionExhaustion() throws IOException { + int MAX_INSTANCES = 5; // fails on iteration 3 if zk connections leak + for (int i = 0; i < MAX_INSTANCES; i++) { + final int iter = i; + try { + openCloseTableOutputFormat(iter); + } catch (Exception e) { + LOG.error("Exception encountered", e); + fail("Failed on iteration " + i); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java new file mode 100644 index 0000000..1c72f2a --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableSnapshotInputFormat.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatTestBase; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.util.Iterator; + +@Category({VerySlowMapReduceTests.class, LargeTests.class}) +public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase { + + private static final byte[] aaa = Bytes.toBytes("aaa"); + private static final byte[] after_zzz = Bytes.toBytes("zz{"); // 'z' + 1 => '{' + private static final String COLUMNS = + Bytes.toString(FAMILIES[0]) + " " + Bytes.toString(FAMILIES[1]); + + @Rule + public TestName name = new TestName(); + + @Override + protected byte[] getStartRow() { + return aaa; + } + + @Override + protected byte[] getEndRow() { + return after_zzz; + } + + static class TestTableSnapshotMapper extends MapReduceBase + implements TableMap<ImmutableBytesWritable, NullWritable> { + @Override + public void map(ImmutableBytesWritable key, Result value, + OutputCollector<ImmutableBytesWritable, NullWritable> collector, Reporter reporter) + throws IOException { + verifyRowFromMap(key, value); + collector.collect(key, NullWritable.get()); + } + } + + public static class TestTableSnapshotReducer extends MapReduceBase + implements Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> { + HBaseTestingUtility.SeenRowTracker rowTracker = + new HBaseTestingUtility.SeenRowTracker(aaa, after_zzz); + + @Override + public void reduce(ImmutableBytesWritable key, Iterator<NullWritable> values, + OutputCollector<NullWritable, NullWritable> collector, Reporter reporter) + throws IOException { + rowTracker.addRow(key.get()); + } + + @Override + public void close() { + rowTracker.validate(); + } + } + + @Test + public void testInitTableSnapshotMapperJobConfig() throws Exception { + setupCluster(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + String snapshotName = "foo"; + + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); + JobConf job = new JobConf(UTIL.getConfiguration()); + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, + COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + // TODO: would be better to examine directly the cache instance that results from this + // config. Currently this is not possible because BlockCache initialization is static. + Assert.assertEquals( + "Snapshot job should be configured for default LruBlockCache.", + HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT, + job.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01); + Assert.assertEquals( + "Snapshot job should not use BucketCache.", + 0, job.getFloat("hbase.bucketcache.size", -1), 0.01); + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + // TODO: mapred does not support limiting input range by startrow, endrow. + // Thus the following tests must override parameterverification. + + @Test + @Override + public void testWithMockedMapReduceMultiRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 10); + } + + @Test + @Override + public void testWithMapReduceMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 10, false); + } + + @Test + @Override + // run the MR job while HBase is offline + public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 10, true); + } + + @Override + public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, + String snapshotName, Path tmpTableDir) throws Exception { + JobConf job = new JobConf(UTIL.getConfiguration()); + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, + COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + } + + @Override + protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits) throws Exception { + setupCluster(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + try { + createTableAndSnapshot( + util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions); + + JobConf job = new JobConf(util.getConfiguration()); + Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName); + + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, + COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, job, false, tmpTableDir); + + // mapred doesn't support start and end keys? o.O + verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow()); + + } finally { + util.getAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + tearDownCluster(); + } + } + + private void verifyWithMockedMapReduce(JobConf job, int numRegions, int expectedNumSplits, + byte[] startRow, byte[] stopRow) throws IOException, InterruptedException { + TableSnapshotInputFormat tsif = new TableSnapshotInputFormat(); + InputSplit[] splits = tsif.getSplits(job, 0); + + Assert.assertEquals(expectedNumSplits, splits.length); + + HBaseTestingUtility.SeenRowTracker rowTracker = + new HBaseTestingUtility.SeenRowTracker(startRow, stopRow); + + for (int i = 0; i < splits.length; i++) { + // validate input split + InputSplit split = splits[i]; + Assert.assertTrue(split instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit); + + // validate record reader + OutputCollector collector = mock(OutputCollector.class); + Reporter reporter = mock(Reporter.class); + RecordReader<ImmutableBytesWritable, Result> rr = tsif.getRecordReader(split, job, reporter); + + // validate we can read all the data back + ImmutableBytesWritable key = rr.createKey(); + Result value = rr.createValue(); + while (rr.next(key, value)) { + verifyRowFromMap(key, value); + rowTracker.addRow(key.copyBytes()); + } + + rr.close(); + } + + // validate all rows are seen + rowTracker.validate(); + } + + @Override + protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + boolean shutdownCluster) throws Exception { + doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir, + numRegions, expectedNumSplits, shutdownCluster); + } + + // this is also called by the IntegrationTestTableSnapshotInputFormat + public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions, + int expectedNumSplits, boolean shutdownCluster) throws Exception { + + //create the table and snapshot + createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions); + + if (shutdownCluster) { + util.shutdownMiniHBaseCluster(); + } + + try { + // create the job + JobConf jobConf = new JobConf(util.getConfiguration()); + + jobConf.setJarByClass(util.getClass()); + org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJarsForClasses(jobConf, + TestTableSnapshotInputFormat.class); + + TableMapReduceUtil.initTableSnapshotMapJob(snapshotName, COLUMNS, + TestTableSnapshotMapper.class, ImmutableBytesWritable.class, + NullWritable.class, jobConf, true, tableDir); + + jobConf.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class); + jobConf.setNumReduceTasks(1); + jobConf.setOutputFormat(NullOutputFormat.class); + + RunningJob job = JobClient.runJob(jobConf); + Assert.assertTrue(job.isSuccessful()); + } finally { + if (!shutdownCluster) { + util.getAdmin().deleteSnapshot(snapshotName); + util.deleteTable(tableName); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java new file mode 100644 index 0000000..b342f64 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.security.UserProvider; + +/** + * A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying + * configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used + * to do the authentication, which requires a Kerberos ticket (which we currently don't have in + * tests). + * <p> + * This should only be used for <b>TESTING</b>. + */ +public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider { + + @Override + public boolean isHBaseSecurityEnabled() { + return false; + } + + @Override + public boolean isHadoopSecurityEnabled() { + return true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java new file mode 100644 index 0000000..c717fa9 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatTestBase.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.NavigableMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base set of tests and setup for input formats touching multiple tables. + */ +public abstract class MultiTableInputFormatTestBase { + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + static final Log LOG = LogFactory.getLog(TestMultiTableInputFormat.class); + public static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + static final String TABLE_NAME = "scantest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final String KEY_STARTROW = "startRow"; + static final String KEY_LASTROW = "stpRow"; + + static List<String> TABLES = Lists.newArrayList(); + + static { + for (int i = 0; i < 3; i++) { + TABLES.add(TABLE_NAME + String.valueOf(i)); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // switch TIF to log at DEBUG level + TEST_UTIL.enableDebug(MultiTableInputFormatBase.class); + // start mini hbase cluster + TEST_UTIL.startMiniCluster(3); + // create and fill table + for (String tableName : TABLES) { + try (Table table = + TEST_UTIL.createMultiRegionTable(TableName.valueOf(tableName), + INPUT_FAMILY, 4)) { + TEST_UTIL.loadTable(table, INPUT_FAMILY, false); + } + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @After + public void tearDown() throws Exception { + Configuration c = TEST_UTIL.getConfiguration(); + FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir"))); + } + + /** + * Pass the key and value to reducer. + */ + public static class ScanMapper extends + TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> { + /** + * Pass the key and value to reduce. + * + * @param key The key, here "aaa", "aab" etc. + * @param value The value is the same as the key. + * @param context The task context. + * @throws IOException When reading the rows fails. + */ + @Override + public void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + makeAssertions(key, value); + context.write(key, key); + } + + public void makeAssertions(ImmutableBytesWritable key, Result value) throws IOException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> cf = + value.getMap(); + if (!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null)); + LOG.debug("map: key -> " + Bytes.toStringBinary(key.get()) + + ", value -> " + val); + } + } + + /** + * Checks the last and first keys seen against the scanner boundaries. + */ + public static class ScanReducer + extends + Reducer<ImmutableBytesWritable, ImmutableBytesWritable, + NullWritable, NullWritable> { + private String first = null; + private String last = null; + + @Override + protected void reduce(ImmutableBytesWritable key, + Iterable<ImmutableBytesWritable> values, Context context) + throws IOException, InterruptedException { + makeAssertions(key, values); + } + + protected void makeAssertions(ImmutableBytesWritable key, + Iterable<ImmutableBytesWritable> values) { + int count = 0; + for (ImmutableBytesWritable value : values) { + String val = Bytes.toStringBinary(value.get()); + LOG.debug("reduce: key[" + count + "] -> " + + Bytes.toStringBinary(key.get()) + ", value -> " + val); + if (first == null) first = val; + last = val; + count++; + } + assertEquals(3, count); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + Configuration c = context.getConfiguration(); + cleanup(c); + } + + protected void cleanup(Configuration c) { + String startRow = c.get(KEY_STARTROW); + String lastRow = c.get(KEY_LASTROW); + LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + + startRow + "\""); + LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + + "\""); + if (startRow != null && startRow.length() > 0) { + assertEquals(startRow, first); + } + if (lastRow != null && lastRow.length() > 0) { + assertEquals(lastRow, last); + } + } + } + + @Test + public void testScanEmptyToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, null, null); + } + + @Test + public void testScanEmptyToAPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan(null, "app", "apo"); + } + + @Test + public void testScanOBBToOPP() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("obb", "opp", "opo"); + } + + @Test + public void testScanYZYToEmpty() throws IOException, InterruptedException, + ClassNotFoundException { + testScan("yzy", null, "zzz"); + } + + /** + * Tests a MR scan using specific start and stop rows. + * + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testScan(String start, String stop, String last) + throws IOException, InterruptedException, ClassNotFoundException { + String jobName = + "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") + "To" + + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty"); + LOG.info("Before map/reduce startup - job " + jobName); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + c.set(KEY_STARTROW, start != null ? start : ""); + c.set(KEY_LASTROW, last != null ? last : ""); + + List<Scan> scans = new ArrayList<>(); + + for (String tableName : TABLES) { + Scan scan = new Scan(); + + scan.addFamily(INPUT_FAMILY); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); + + if (start != null) { + scan.setStartRow(Bytes.toBytes(start)); + } + if (stop != null) { + scan.setStopRow(Bytes.toBytes(stop)); + } + + scans.add(scan); + + LOG.info("scan before: " + scan); + } + + runJob(jobName, c, scans); + } + + protected void runJob(String jobName, Configuration c, List<Scan> scans) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = new Job(c, jobName); + + initJob(scans, job); + job.setReducerClass(ScanReducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + protected abstract void initJob(List<Scan> scans, Job job) throws IOException; + + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java new file mode 100644 index 0000000..3203f0c --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/NMapInputFormat.java @@ -0,0 +1,134 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Input format that creates a configurable number of map tasks + * each provided with a single row of NullWritables. This can be + * useful when trying to write mappers which don't have any real + * input (eg when the mapper is simply producing random data as output) + */ +public class NMapInputFormat extends InputFormat<NullWritable, NullWritable> { + private static final String NMAPS_KEY = "nmapinputformat.num.maps"; + + @Override + public RecordReader<NullWritable, NullWritable> createRecordReader( + InputSplit split, + TaskAttemptContext tac) throws IOException, InterruptedException { + return new SingleRecordReader<>(NullWritable.get(), NullWritable.get()); + } + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, + InterruptedException { + int count = getNumMapTasks(context.getConfiguration()); + List<InputSplit> splits = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + splits.add(new NullInputSplit()); + } + return splits; + } + + public static void setNumMapTasks(Configuration conf, int numTasks) { + conf.setInt(NMAPS_KEY, numTasks); + } + + public static int getNumMapTasks(Configuration conf) { + return conf.getInt(NMAPS_KEY, 1); + } + + private static class NullInputSplit extends InputSplit implements Writable { + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[] {}; + } + + @Override + public void readFields(DataInput in) throws IOException { + } + + @Override + public void write(DataOutput out) throws IOException { + } + } + + private static class SingleRecordReader<K, V> + extends RecordReader<K, V> { + + private final K key; + private final V value; + boolean providedKey = false; + + SingleRecordReader(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public void close() { + } + + @Override + public K getCurrentKey() { + return key; + } + + @Override + public V getCurrentValue(){ + return value; + } + + @Override + public float getProgress() { + return 0; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext tac) { + } + + @Override + public boolean nextKeyValue() { + if (providedKey) return false; + providedKey = true; + return true; + } + + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java new file mode 100644 index 0000000..fa47253 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; + +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.util.Arrays; + +public abstract class TableSnapshotInputFormatTestBase { + private static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatTestBase.class); + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + protected final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected static final int NUM_REGION_SERVERS = 2; + protected static final byte[][] FAMILIES = {Bytes.toBytes("f1"), Bytes.toBytes("f2")}; + + protected FileSystem fs; + protected Path rootDir; + + public void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(NUM_REGION_SERVERS, true); + rootDir = UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + fs = rootDir.getFileSystem(UTIL.getConfiguration()); + } + + public void tearDownCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private static void setupConf(Configuration conf) { + // Enable snapshot + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + } + + protected abstract void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits) throws Exception; + + protected abstract void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName, + String snapshotName, Path tableDir, int numRegions, int expectedNumSplits, + boolean shutdownCluster) throws Exception; + + protected abstract byte[] getStartRow(); + + protected abstract byte[] getEndRow(); + + @Test + public void testWithMockedMapReduceSingleRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceSingleRegion", 1, 1); + } + + @Test + public void testWithMockedMapReduceMultiRegion() throws Exception { + testWithMockedMapReduce(UTIL, "testWithMockedMapReduceMultiRegion", 10, 8); + } + + @Test + public void testWithMapReduceSingleRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceSingleRegion", 1, 1, false); + } + + @Test + public void testWithMapReduceMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceMultiRegion", 10, 8, false); + } + + @Test + // run the MR job while HBase is offline + public void testWithMapReduceAndOfflineHBaseMultiRegion() throws Exception { + testWithMapReduce(UTIL, "testWithMapReduceAndOfflineHBaseMultiRegion", 10, 8, true); + } + + // Test that snapshot restore does not create back references in the HBase root dir. + @Test + public void testRestoreSnapshotDoesNotCreateBackRefLinks() throws Exception { + setupCluster(); + TableName tableName = TableName.valueOf("testRestoreSnapshotDoesNotCreateBackRefLinks"); + String snapshotName = "foo"; + + try { + createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1); + + Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName); + + testRestoreSnapshotDoesNotCreateBackRefLinksInit(tableName, snapshotName,tmpTableDir); + + Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration()); + for (Path regionDir : FSUtils.getRegionDirs(fs, FSUtils.getTableDir(rootDir, tableName))) { + for (Path storeDir : FSUtils.getFamilyDirs(fs, regionDir)) { + for (FileStatus status : fs.listStatus(storeDir)) { + System.out.println(status.getPath()); + if (StoreFileInfo.isValid(status)) { + Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(UTIL.getConfiguration(), + tableName, regionDir.getName(), storeDir.getName()); + + Path path = HFileLink.getBackReferencesDir(storeDir, status.getPath().getName()); + // assert back references directory is empty + assertFalse("There is a back reference in " + path, fs.exists(path)); + + path = HFileLink.getBackReferencesDir(archiveStoreDir, status.getPath().getName()); + // assert back references directory is empty + assertFalse("There is a back reference in " + path, fs.exists(path)); + } + } + } + } + } finally { + UTIL.getAdmin().deleteSnapshot(snapshotName); + UTIL.deleteTable(tableName); + tearDownCluster(); + } + } + + public abstract void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName, + String snapshotName, Path tmpTableDir) throws Exception; + + protected void testWithMapReduce(HBaseTestingUtility util, String snapshotName, + int numRegions, int expectedNumSplits, boolean shutdownCluster) throws Exception { + setupCluster(); + try { + Path tableDir = util.getDataTestDirOnTestFS(snapshotName); + TableName tableName = TableName.valueOf("testWithMapReduce"); + testWithMapReduceImpl(util, tableName, snapshotName, tableDir, numRegions, + expectedNumSplits, shutdownCluster); + } finally { + tearDownCluster(); + } + } + + protected static void verifyRowFromMap(ImmutableBytesWritable key, Result result) + throws IOException { + byte[] row = key.get(); + CellScanner scanner = result.cellScanner(); + while (scanner.advance()) { + Cell cell = scanner.current(); + + //assert that all Cells in the Result have the same key + Assert.assertEquals(0, Bytes.compareTo(row, 0, row.length, + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + + for (int j = 0; j < FAMILIES.length; j++) { + byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); + Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + + " ,actual:" + Bytes.toString(actual), row, actual); + } + } + + protected static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName, + String snapshotName, byte[] startRow, byte[] endRow, int numRegions) + throws Exception { + try { + LOG.debug("Ensuring table doesn't exist."); + util.deleteTable(tableName); + } catch(Exception ex) { + // ignore + } + + LOG.info("creating table '" + tableName + "'"); + if (numRegions > 1) { + util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions); + } else { + util.createTable(tableName, FAMILIES); + } + Admin admin = util.getAdmin(); + + LOG.info("put some stuff in the table"); + Table table = util.getConnection().getTable(tableName); + util.loadTable(table, FAMILIES); + + Path rootDir = FSUtils.getRootDir(util.getConfiguration()); + FileSystem fs = rootDir.getFileSystem(util.getConfiguration()); + + LOG.info("snapshot"); + SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, + Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); + + LOG.info("load different values"); + byte[] value = Bytes.toBytes("after_snapshot_value"); + util.loadTable(table, FAMILIES, value); + + LOG.info("cause flush to create new files in the region"); + admin.flush(tableName); + table.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java new file mode 100644 index 0000000..ff623cb --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCellCounter.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.*; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@Category({MapReduceTests.class, LargeTests.class}) +public class TestCellCounter { + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] ROW1 = Bytes.toBytesBinary("\\x01row1"); + private static final byte[] ROW2 = Bytes.toBytesBinary("\\x01row2"); + private static final String FAMILY_A_STRING = "a"; + private static final String FAMILY_B_STRING = "b"; + private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING); + private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + private static Path FQ_OUTPUT_DIR; + private static final String OUTPUT_DIR = "target" + File.separator + "test-data" + File.separator + + "output"; + private static long now = System.currentTimeMillis(); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + FQ_OUTPUT_DIR = new Path(OUTPUT_DIR).makeQualified(new LocalFileSystem()); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + /** + * Test CellCounter all data should print to output + * + */ + @Test (timeout=300000) + public void testCellCounter() throws Exception { + final TableName sourceTable = TableName.valueOf(name.getMethodName()); + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(sourceTable, families); + try{ + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { sourceTable.getNameAsString(), FQ_OUTPUT_DIR.toString(), ";", "^row1" }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter all data should print to output + */ + @Test(timeout = 300000) + public void testCellCounterPrefix() throws Exception { + final TableName sourceTable = TableName.valueOf(name.getMethodName()); + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(sourceTable, families); + try { + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { sourceTable.getNameAsString(), FQ_OUTPUT_DIR.toString(), ";", "\\x01row1" }; + runCount(args); + FileInputStream inputStream = + new FileInputStream(OUTPUT_DIR + File.separator + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + } finally { + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounterStartTimeRange() throws Exception { + final TableName sourceTable = TableName.valueOf(name.getMethodName()); + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(sourceTable, families); + try{ + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable.getNameAsString(), FQ_OUTPUT_DIR.toString(), ";", "^row1", + "--starttime=" + now, + "--endtime=" + now + 2 }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounteEndTimeRange() throws Exception { + final TableName sourceTable = TableName.valueOf(name.getMethodName()); + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(sourceTable, families); + try{ + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable.getNameAsString(), FQ_OUTPUT_DIR.toString(), ";", "^row1", + "--endtime=" + now + 1 }; + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total ROWS" + "\t" + "1")); + assertTrue(data.contains("b;q" + "\t" + "1")); + assertTrue(data.contains("a;q" + "\t" + "1")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + /** + * Test CellCounter with time range all data should print to output + */ + @Test (timeout=300000) + public void testCellCounteOutOfTimeRange() throws Exception { + final TableName sourceTable = TableName.valueOf(name.getMethodName()); + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(sourceTable, families); + try{ + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { + sourceTable.getNameAsString(), FQ_OUTPUT_DIR.toString(), ";", "--starttime=" + now + 1, + "--endtime=" + now + 2 }; + + runCount(args); + FileInputStream inputStream = new FileInputStream(OUTPUT_DIR + File.separator + + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + // nothing should hace been emitted to the reducer + assertTrue(data.isEmpty()); + }finally{ + t.close(); + FileUtil.fullyDelete(new File(OUTPUT_DIR)); + } + } + + + private boolean runCount(String[] args) throws Exception { + // need to make a copy of the configuration because to make sure + // different temp dirs are used. + int status = ToolRunner.run(new Configuration(UTIL.getConfiguration()), new CellCounter(), + args); + return status == 0; + } + + /** + * Test main method of CellCounter + */ + @Test (timeout=300000) + public void testCellCounterMain() throws Exception { + + PrintStream oldPrintStream = System.err; + SecurityManager SECURITY_MANAGER = System.getSecurityManager(); + LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); + System.setSecurityManager(newSecurityManager); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + String[] args = {}; + System.setErr(new PrintStream(data)); + try { + System.setErr(new PrintStream(data)); + + try { + CellCounter.main(args); + fail("should be SecurityException"); + } catch (SecurityException e) { + assertEquals(-1, newSecurityManager.getExitCode()); + assertTrue(data.toString().contains("ERROR: Wrong number of parameters:")); + // should be information about usage + assertTrue(data.toString().contains("Usage:")); + } + + } finally { + System.setErr(oldPrintStream); + System.setSecurityManager(SECURITY_MANAGER); + } + } + + /** + * Test CellCounter for complete table all data should print to output + */ + @Test(timeout = 600000) + public void testCellCounterForCompleteTable() throws Exception { + final TableName sourceTable = TableName.valueOf(name.getMethodName()); + String outputPath = OUTPUT_DIR + sourceTable; + LocalFileSystem localFileSystem = new LocalFileSystem(); + Path outputDir = + new Path(outputPath).makeQualified(localFileSystem.getUri(), + localFileSystem.getWorkingDirectory()); + byte[][] families = { FAMILY_A, FAMILY_B }; + Table t = UTIL.createTable(sourceTable, families); + try { + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, now, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, now + 1, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, now + 2, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, now, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, now + 1, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, now + 2, Bytes.toBytes("Data23")); + t.put(p); + String[] args = { sourceTable.getNameAsString(), outputDir.toString(), ";" }; + runCount(args); + FileInputStream inputStream = + new FileInputStream(outputPath + File.separator + "part-r-00000"); + String data = IOUtils.toString(inputStream); + inputStream.close(); + assertTrue(data.contains("Total Families Across all Rows" + "\t" + "2")); + assertTrue(data.contains("Total Qualifiers across all Rows" + "\t" + "4")); + assertTrue(data.contains("Total ROWS" + "\t" + "2")); + assertTrue(data.contains("b;q" + "\t" + "2")); + assertTrue(data.contains("a;q" + "\t" + "2")); + assertTrue(data.contains("row1;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row1;b;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row2;a;q_Versions" + "\t" + "1")); + assertTrue(data.contains("row2;b;q_Versions" + "\t" + "1")); + + FileUtil.fullyDelete(new File(outputPath)); + args = new String[] { "-D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=a, b", + sourceTable.getNameAsString(), outputDir.toString(), ";"}; + runCount(args); + inputStream = new FileInputStream(outputPath + File.separator + "part-r-00000"); + String data2 = IOUtils.toString(inputStream); + inputStream.close(); + assertEquals(data, data2); + } finally { + t.close(); + localFileSystem.close(); + FileUtil.fullyDelete(new File(outputPath)); + } + } + + @Test + public void TestCellCounterWithoutOutputDir() throws Exception { + String[] args = new String[] { "tableName" }; + assertEquals("CellCounter should exit with -1 as output directory is not specified.", -1, + ToolRunner.run(HBaseConfiguration.create(), new CellCounter(), args)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java new file mode 100644 index 0000000..0bec03b --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.LauncherSecurityManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +/** + * Basic test for the CopyTable M/R tool + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestCopyTable { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final String FAMILY_A_STRING = "a"; + private static final String FAMILY_B_STRING = "b"; + private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING); + private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void beforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass + public static void afterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private void doCopyTableTest(boolean bulkload) throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + + try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);) { + // put rows into the first table + for (int i = 0; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + } + + CopyTable copy = new CopyTable(); + + int code; + if (bulkload) { + code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), + copy, new String[] { "--new.name=" + tableName2.getNameAsString(), + "--bulkload", tableName1.getNameAsString() }); + } else { + code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), + copy, new String[] { "--new.name=" + tableName2.getNameAsString(), + tableName1.getNameAsString() }); + } + assertEquals("copy job failed", 0, code); + + // verify the data was copied into table 2 + for (int i = 0; i < 10; i++) { + Get g = new Get(Bytes.toBytes("row" + i)); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1)); + } + } finally { + TEST_UTIL.deleteTable(tableName1); + TEST_UTIL.deleteTable(tableName2); + } + } + + /** + * Simple end-to-end test + * @throws Exception + */ + @Test + public void testCopyTable() throws Exception { + doCopyTableTest(false); + } + + /** + * Simple end-to-end test with bulkload. + */ + @Test + public void testCopyTableWithBulkload() throws Exception { + doCopyTableTest(true); + } + + @Test + public void testStartStopRow() throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + final byte[] FAMILY = Bytes.toBytes("family"); + final byte[] COLUMN1 = Bytes.toBytes("c1"); + final byte[] ROW0 = Bytes.toBytesBinary("\\x01row0"); + final byte[] ROW1 = Bytes.toBytesBinary("\\x01row1"); + final byte[] ROW2 = Bytes.toBytesBinary("\\x01row2"); + + Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); + Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); + + // put rows into the first table + Put p = new Put(ROW0); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + p = new Put(ROW1); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY, COLUMN1, COLUMN1); + t1.put(p); + + CopyTable copy = new CopyTable(); + assertEquals( + 0, + ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), + copy, new String[] { "--new.name=" + tableName2, "--startrow=\\x01row1", + "--stoprow=\\x01row2", tableName1.getNameAsString() })); + + // verify the data was copied into table 2 + // row1 exist, row0, row2 do not exist + Get g = new Get(ROW1); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1)); + + g = new Get(ROW0); + r = t2.get(g); + assertEquals(0, r.size()); + + g = new Get(ROW2); + r = t2.get(g); + assertEquals(0, r.size()); + + t1.close(); + t2.close(); + TEST_UTIL.deleteTable(tableName1); + TEST_UTIL.deleteTable(tableName2); + } + + /** + * Test copy of table from sourceTable to targetTable all rows from family a + */ + @Test + public void testRenameFamily() throws Exception { + final TableName sourceTable = TableName.valueOf(name.getMethodName() + "source"); + final TableName targetTable = TableName.valueOf(name.getMethodName() + "-target"); + + byte[][] families = { FAMILY_A, FAMILY_B }; + + Table t = TEST_UTIL.createTable(sourceTable, families); + Table t2 = TEST_UTIL.createTable(targetTable, families); + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data23")); + t.put(p); + + long currentTime = System.currentTimeMillis(); + String[] args = new String[] { "--new.name=" + targetTable, "--families=a:b", "--all.cells", + "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000), + "--versions=1", sourceTable.getNameAsString() }; + assertNull(t2.get(new Get(ROW1)).getRow()); + + assertTrue(runCopy(args)); + + assertNotNull(t2.get(new Get(ROW1)).getRow()); + Result res = t2.get(new Get(ROW1)); + byte[] b1 = res.getValue(FAMILY_B, QUALIFIER); + assertEquals("Data13", new String(b1)); + assertNotNull(t2.get(new Get(ROW2)).getRow()); + res = t2.get(new Get(ROW2)); + b1 = res.getValue(FAMILY_A, QUALIFIER); + // Data from the family of B is not copied + assertNull(b1); + + } + + /** + * Test main method of CopyTable. + */ + @Test + public void testMainMethod() throws Exception { + String[] emptyArgs = { "-h" }; + PrintStream oldWriter = System.err; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + PrintStream writer = new PrintStream(data); + System.setErr(writer); + SecurityManager SECURITY_MANAGER = System.getSecurityManager(); + LauncherSecurityManager newSecurityManager= new LauncherSecurityManager(); + System.setSecurityManager(newSecurityManager); + try { + CopyTable.main(emptyArgs); + fail("should be exit"); + } catch (SecurityException e) { + assertEquals(1, newSecurityManager.getExitCode()); + } finally { + System.setErr(oldWriter); + System.setSecurityManager(SECURITY_MANAGER); + } + assertTrue(data.toString().contains("rs.class")); + // should print usage information + assertTrue(data.toString().contains("Usage:")); + } + + private boolean runCopy(String[] args) throws Exception { + int status = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), new CopyTable(), + args); + return status == 0; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/59d03410/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestGroupingTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestGroupingTableMapper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestGroupingTableMapper.java new file mode 100644 index 0000000..7e36602 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestGroupingTableMapper.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Mapper; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.mockito.Mockito.*; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestGroupingTableMapper { + + /** + * Test GroupingTableMapper class + */ + @Test + public void testGroupingTableMapper() throws Exception { + + GroupingTableMapper mapper = new GroupingTableMapper(); + Configuration configuration = new Configuration(); + configuration.set(GroupingTableMapper.GROUP_COLUMNS, "family1:clm family2:clm"); + mapper.setConf(configuration); + + Result result = mock(Result.class); + @SuppressWarnings("unchecked") + Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context = + mock(Mapper.Context.class); + context.write(any(ImmutableBytesWritable.class), any(Result.class)); + List<Cell> keyValue = new ArrayList<>(); + byte[] row = {}; + keyValue.add(new KeyValue(row, Bytes.toBytes("family2"), Bytes.toBytes("clm"), Bytes + .toBytes("value1"))); + keyValue.add(new KeyValue(row, Bytes.toBytes("family1"), Bytes.toBytes("clm"), Bytes + .toBytes("value2"))); + when(result.listCells()).thenReturn(keyValue); + mapper.map(null, result, context); + // template data + byte[][] data = { Bytes.toBytes("value1"), Bytes.toBytes("value2") }; + ImmutableBytesWritable ibw = mapper.createGroupKey(data); + verify(context).write(ibw, result); + } + +}
