http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java new file mode 100644 index 0000000..e669f14 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableSnapshotScanner; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.AbstractHBaseTool; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch; + +/** + * A simple performance evaluation tool for single client and MR scans + * and snapshot scans. + */ [email protected](HBaseInterfaceAudience.TOOLS) +public class ScanPerformanceEvaluation extends AbstractHBaseTool { + + private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters"; + + private String type; + private String file; + private String tablename; + private String snapshotName; + private String restoreDir; + private String caching; + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + Path rootDir; + try { + rootDir = FSUtils.getRootDir(conf); + rootDir.getFileSystem(conf); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void addOptions() { + this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce"); + this.addOptWithArg("f", "file", "the filename to read from"); + this.addOptWithArg("tn", "table", "the tablename to read from"); + this.addOptWithArg("sn", "snapshot", "the snapshot name to read from"); + this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot"); + this.addOptWithArg("ch", "caching", "scanner caching value"); + } + + @Override + protected void processOptions(CommandLine cmd) { + type = cmd.getOptionValue("type"); + file = cmd.getOptionValue("file"); + tablename = cmd.getOptionValue("table"); + snapshotName = cmd.getOptionValue("snapshot"); + restoreDir = cmd.getOptionValue("restoredir"); + caching = cmd.getOptionValue("caching"); + } + + protected void testHdfsStreaming(Path filename) throws IOException { + byte[] buf = new byte[1024]; + FileSystem fs = filename.getFileSystem(getConf()); + + // read the file from start to finish + Stopwatch fileOpenTimer = Stopwatch.createUnstarted(); + Stopwatch streamTimer = Stopwatch.createUnstarted(); + + fileOpenTimer.start(); + FSDataInputStream in = fs.open(filename); + fileOpenTimer.stop(); + + long totalBytes = 0; + streamTimer.start(); + while (true) { + int read = in.read(buf); + if (read < 0) { + break; + } + totalBytes += read; + } + streamTimer.stop(); + + double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS); + + System.out.println("HDFS streaming: "); + System.out.println("total time to open: " + + fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + } + + private Scan getScan() { + Scan scan = new Scan(); // default scan settings + scan.setCacheBlocks(false); + scan.setMaxVersions(1); + scan.setScanMetricsEnabled(true); + if (caching != null) { + scan.setCaching(Integer.parseInt(caching)); + } + + return scan; + } + + public void testScan() throws IOException { + Stopwatch tableOpenTimer = Stopwatch.createUnstarted(); + Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); + Stopwatch scanTimer = Stopwatch.createUnstarted(); + + tableOpenTimer.start(); + Connection connection = ConnectionFactory.createConnection(getConf()); + Table table = connection.getTable(TableName.valueOf(tablename)); + tableOpenTimer.stop(); + + Scan scan = getScan(); + scanOpenTimer.start(); + ResultScanner scanner = table.getScanner(scan); + scanOpenTimer.stop(); + + long numRows = 0; + long numCells = 0; + scanTimer.start(); + while (true) { + Result result = scanner.next(); + if (result == null) { + break; + } + numRows++; + + numCells += result.rawCells().length; + } + scanTimer.stop(); + scanner.close(); + table.close(); + connection.close(); + + ScanMetrics metrics = scan.getScanMetrics(); + long totalBytes = metrics.countOfBytesInResults.get(); + double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); + + System.out.println("HBase scan: "); + System.out.println("total time to open table: " + + tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total time to open scanner: " + + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total time to scan: " + + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + + System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + } + + + public void testSnapshotScan() throws IOException { + Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted(); + Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); + Stopwatch scanTimer = Stopwatch.createUnstarted(); + + Path restoreDir = new Path(this.restoreDir); + + snapshotRestoreTimer.start(); + restoreDir.getFileSystem(conf).delete(restoreDir, true); + snapshotRestoreTimer.stop(); + + Scan scan = getScan(); + scanOpenTimer.start(); + TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan); + scanOpenTimer.stop(); + + long numRows = 0; + long numCells = 0; + scanTimer.start(); + while (true) { + Result result = scanner.next(); + if (result == null) { + break; + } + numRows++; + + numCells += result.rawCells().length; + } + scanTimer.stop(); + scanner.close(); + + ScanMetrics metrics = scanner.getScanMetrics(); + long totalBytes = metrics.countOfBytesInResults.get(); + double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); + + System.out.println("HBase scan snapshot: "); + System.out.println("total time to restore snapshot: " + + snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total time to open scanner: " + + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total time to scan: " + + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + + System.out.println("Scan metrics:\n" + metrics.getMetricsMap()); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + + } + + public static enum ScanCounter { + NUM_ROWS, + NUM_CELLS, + } + + public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> { + @Override + protected void map(ImmutableBytesWritable key, Result value, + Context context) throws IOException, + InterruptedException { + context.getCounter(ScanCounter.NUM_ROWS).increment(1); + context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length); + } + } + + public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { + Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); + Stopwatch scanTimer = Stopwatch.createUnstarted(); + + Scan scan = getScan(); + + String jobName = "testScanMapReduce"; + + Job job = new Job(conf); + job.setJobName(jobName); + + job.setJarByClass(getClass()); + + TableMapReduceUtil.initTableMapperJob( + this.tablename, + scan, + MyMapper.class, + NullWritable.class, + NullWritable.class, + job + ); + + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + + scanTimer.start(); + job.waitForCompletion(true); + scanTimer.stop(); + + Counters counters = job.getCounters(); + long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); + long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); + + long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); + double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); + + System.out.println("HBase scan mapreduce: "); + System.out.println("total time to open scanner: " + + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + } + + public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException { + Stopwatch scanOpenTimer = Stopwatch.createUnstarted(); + Stopwatch scanTimer = Stopwatch.createUnstarted(); + + Scan scan = getScan(); + + String jobName = "testSnapshotScanMapReduce"; + + Job job = new Job(conf); + job.setJobName(jobName); + + job.setJarByClass(getClass()); + + TableMapReduceUtil.initTableSnapshotMapperJob( + this.snapshotName, + scan, + MyMapper.class, + NullWritable.class, + NullWritable.class, + job, + true, + new Path(restoreDir) + ); + + job.setNumReduceTasks(0); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + + scanTimer.start(); + job.waitForCompletion(true); + scanTimer.stop(); + + Counters counters = job.getCounters(); + long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue(); + long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue(); + + long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue(); + double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS); + double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS); + + System.out.println("HBase scan mapreduce: "); + System.out.println("total time to open scanner: " + + scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms"); + + System.out.println("total bytes: " + totalBytes + " bytes (" + + StringUtils.humanReadableInt(totalBytes) + ")"); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s"); + System.out.println("total rows : " + numRows); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s"); + System.out.println("total cells : " + numCells); + System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s"); + } + + @Override + protected int doWork() throws Exception { + if (type.equals("streaming")) { + testHdfsStreaming(new Path(file)); + } else if (type.equals("scan")){ + testScan(); + } else if (type.equals("snapshotscan")) { + testSnapshotScan(); + } else if (type.equals("scanmapreduce")) { + testScanMapReduce(); + } else if (type.equals("snapshotscanmapreduce")) { + testSnapshotScanMapReduce(); + } + return 0; + } + + public static void main (String[] args) throws Exception { + int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args); + System.exit(ret); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java new file mode 100644 index 0000000..86a3d3f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.*; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.Random; +import java.util.LinkedList; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.PerformanceEvaluation.RandomReadTest; +import org.apache.hadoop.hbase.PerformanceEvaluation.TestOptions; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.UniformReservoir; + +@Category({MiscTests.class, SmallTests.class}) +public class TestPerformanceEvaluation { + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + + @Test + public void testSerialization() + throws JsonGenerationException, JsonMappingException, IOException { + PerformanceEvaluation.TestOptions options = new PerformanceEvaluation.TestOptions(); + assertTrue(!options.isAutoFlush()); + options.setAutoFlush(true); + ObjectMapper mapper = new ObjectMapper(); + String optionsString = mapper.writeValueAsString(options); + PerformanceEvaluation.TestOptions optionsDeserialized = + mapper.readValue(optionsString, PerformanceEvaluation.TestOptions.class); + assertTrue(optionsDeserialized.isAutoFlush()); + } + + /** + * Exercise the mr spec writing. Simple assertions to make sure it is basically working. + * @throws IOException + */ + @Ignore @Test + public void testWriteInputFile() throws IOException { + TestOptions opts = new PerformanceEvaluation.TestOptions(); + final int clients = 10; + opts.setNumClientThreads(clients); + opts.setPerClientRunRows(10); + Path dir = + PerformanceEvaluation.writeInputFile(HTU.getConfiguration(), opts, HTU.getDataTestDir()); + FileSystem fs = FileSystem.get(HTU.getConfiguration()); + Path p = new Path(dir, PerformanceEvaluation.JOB_INPUT_FILENAME); + long len = fs.getFileStatus(p).getLen(); + assertTrue(len > 0); + byte [] content = new byte[(int)len]; + FSDataInputStream dis = fs.open(p); + try { + dis.readFully(content); + BufferedReader br = + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(content))); + int count = 0; + while (br.readLine() != null) { + count++; + } + assertEquals(clients, count); + } finally { + dis.close(); + } + } + + @Test + public void testSizeCalculation() { + TestOptions opts = new PerformanceEvaluation.TestOptions(); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + int rows = opts.getPerClientRunRows(); + // Default row count + final int defaultPerClientRunRows = 1024 * 1024; + assertEquals(defaultPerClientRunRows, rows); + // If size is 2G, then twice the row count. + opts.setSize(2.0f); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows()); + // If two clients, then they get half the rows each. + opts.setNumClientThreads(2); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(defaultPerClientRunRows, opts.getPerClientRunRows()); + // What if valueSize is 'random'? Then half of the valueSize so twice the rows. + opts.valueRandom = true; + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows()); + } + + @Test + public void testRandomReadCalculation() { + TestOptions opts = new PerformanceEvaluation.TestOptions(); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + int rows = opts.getPerClientRunRows(); + // Default row count + final int defaultPerClientRunRows = 1024 * 1024; + assertEquals(defaultPerClientRunRows, rows); + // If size is 2G, then twice the row count. + opts.setSize(2.0f); + opts.setPerClientRunRows(1000); + opts.setCmdName(PerformanceEvaluation.RANDOM_READ); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(1000, opts.getPerClientRunRows()); + // If two clients, then they get half the rows each. + opts.setNumClientThreads(2); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(1000, opts.getPerClientRunRows()); + Random random = new Random(); + // assuming we will get one before this loop expires + boolean foundValue = false; + for (int i = 0; i < 10000000; i++) { + int randomRow = PerformanceEvaluation.generateRandomRow(random, opts.totalRows); + if (randomRow > 1000) { + foundValue = true; + break; + } + } + assertTrue("We need to get a value more than 1000", foundValue); + } + + @Test + public void testZipfian() + throws NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException, + IllegalArgumentException, InvocationTargetException { + TestOptions opts = new PerformanceEvaluation.TestOptions(); + opts.setValueZipf(true); + final int valueSize = 1024; + opts.setValueSize(valueSize); + RandomReadTest rrt = new RandomReadTest(null, opts, null); + Constructor<?> ctor = + Histogram.class.getDeclaredConstructor(com.codahale.metrics.Reservoir.class); + ctor.setAccessible(true); + Histogram histogram = (Histogram)ctor.newInstance(new UniformReservoir(1024 * 500)); + for (int i = 0; i < 100; i++) { + histogram.update(rrt.getValueLength(null)); + } + Snapshot snapshot = histogram.getSnapshot(); + double stddev = snapshot.getStdDev(); + assertTrue(stddev != 0 && stddev != 1.0); + assertTrue(snapshot.getStdDev() != 0); + double median = snapshot.getMedian(); + assertTrue(median != 0 && median != 1 && median != valueSize); + } + + @Test + public void testParseOptsWithThreads() { + Queue<String> opts = new LinkedList<>(); + String cmdName = "sequentialWrite"; + int threads = 1; + opts.offer(cmdName); + opts.offer(String.valueOf(threads)); + PerformanceEvaluation.TestOptions options = PerformanceEvaluation.parseOpts(opts); + assertNotNull(options); + assertNotNull(options.getCmdName()); + assertEquals(cmdName, options.getCmdName()); + assertEquals(threads, options.getNumClientThreads()); + } + + @Test + public void testParseOptsWrongThreads() { + Queue<String> opts = new LinkedList<>(); + String cmdName = "sequentialWrite"; + opts.offer(cmdName); + opts.offer("qq"); + try { + PerformanceEvaluation.parseOpts(opts); + } catch (IllegalArgumentException e) { + System.out.println(e.getMessage()); + assertEquals("Command " + cmdName + " does not have threads number", e.getMessage()); + assertTrue(e.getCause() instanceof NumberFormatException); + } + } + + @Test + public void testParseOptsNoThreads() { + Queue<String> opts = new LinkedList<>(); + String cmdName = "sequentialWrite"; + try { + PerformanceEvaluation.parseOpts(opts); + } catch (IllegalArgumentException e) { + System.out.println(e.getMessage()); + assertEquals("Command " + cmdName + " does not have threads number", e.getMessage()); + assertTrue(e.getCause() instanceof NoSuchElementException); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java new file mode 100644 index 0000000..d085c21 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.util.ProgramDriver; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestDriver { + + @Test + public void testDriverMainMethod() throws Throwable { + ProgramDriver programDriverMock = mock(ProgramDriver.class); + Driver.setProgramDriver(programDriverMock); + Driver.main(new String[]{}); + verify(programDriverMock).driver(Mockito.any(String[].class)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java new file mode 100644 index 0000000..7131cf9 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java @@ -0,0 +1,181 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestGroupingTableMap { + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes() + throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + byte[] row = {}; + List<Cell> keyValues = ImmutableList.<Cell>of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")), + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("2222")), + new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("3333"))); + when(result.listCells()).thenReturn(keyValues); + OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock = + mock(OutputCollector.class); + gTableMap.map(null, result, outputCollectorMock, reporter); + verify(result).listCells(); + verifyZeroInteractions(outputCollectorMock); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldCreateNewKeyAlthoughExtraKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + byte[] row = {}; + List<Cell> keyValues = ImmutableList.<Cell>of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")), + new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("2222")), + new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), Bytes.toBytes("3333"))); + when(result.listCells()).thenReturn(keyValues); + OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock = + mock(OutputCollector.class); + gTableMap.map(null, result, outputCollectorMock, reporter); + verify(result).listCells(); + verify(outputCollectorMock, times(1)) + .collect(any(ImmutableBytesWritable.class), any(Result.class)); + verifyNoMoreInteractions(outputCollectorMock); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldCreateNewKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + Result result = mock(Result.class); + Reporter reporter = mock(Reporter.class); + final byte[] bSeparator = Bytes.toBytes(" "); + gTableMap = new GroupingTableMap(); + Configuration cfg = new Configuration(); + cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB"); + JobConf jobConf = new JobConf(cfg); + gTableMap.configure(jobConf); + + final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945"); + final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437"); + byte[] row = {}; + List<Cell> cells = ImmutableList.<Cell>of( + new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue), + new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue)); + when(result.listCells()).thenReturn(cells); + + final AtomicBoolean outputCollected = new AtomicBoolean(); + OutputCollector<ImmutableBytesWritable, Result> outputCollector = + new OutputCollector<ImmutableBytesWritable, Result>() { + @Override + public void collect(ImmutableBytesWritable arg, Result result) throws IOException { + assertArrayEquals(org.apache.hadoop.hbase.shaded.com.google.common.primitives. + Bytes.concat(firstPartKeyValue, bSeparator, + secondPartKeyValue), arg.copyBytes()); + outputCollected.set(true); + } + }; + + gTableMap.map(null, result, outputCollector, reporter); + verify(result).listCells(); + Assert.assertTrue("Output not received", outputCollected.get()); + + final byte[] firstPartValue = Bytes.toBytes("238947928"); + final byte[] secondPartValue = Bytes.toBytes("4678456942345"); + byte[][] data = { firstPartValue, secondPartValue }; + ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data); + assertArrayEquals(org.apache.hadoop.hbase.shaded.com.google.common.primitives. + Bytes.concat(firstPartValue, + bSeparator, secondPartValue), byteWritable.get()); + } finally { + if (gTableMap != null) + gTableMap.close(); + } + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldReturnNullFromCreateGroupKey() throws Exception { + GroupingTableMap gTableMap = null; + try { + gTableMap = new GroupingTableMap(); + assertNull(gTableMap.createGroupKey(null)); + } finally { + if(gTableMap != null) + gTableMap.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java new file mode 100644 index 0000000..e222d0b --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestIdentityTableMap { + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldCollectPredefinedTimes() throws IOException { + int recordNumber = 999; + Result resultMock = mock(Result.class); + IdentityTableMap identityTableMap = null; + try { + Reporter reporterMock = mock(Reporter.class); + identityTableMap = new IdentityTableMap(); + ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class); + OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock = + mock(OutputCollector.class); + + for (int i = 0; i < recordNumber; i++) + identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock, + reporterMock); + + verify(outputCollectorMock, times(recordNumber)).collect( + Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class)); + } finally { + if (identityTableMap != null) + identityTableMap.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java new file mode 100644 index 0000000..665c547 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapred; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +@Category({ VerySlowMapReduceTests.class, LargeTests.class }) +public class TestMultiTableSnapshotInputFormat + extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat { + + private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class); + + @Override + protected void runJob(String jobName, Configuration c, List<Scan> scans) + throws IOException, InterruptedException, ClassNotFoundException { + JobConf job = new JobConf(TEST_UTIL.getConfiguration()); + + job.setJobName(jobName); + job.setMapperClass(Mapper.class); + job.setReducerClass(Reducer.class); + + TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class, + ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir); + + TableMapReduceUtil.addDependencyJars(job); + + job.setReducerClass(Reducer.class); + job.setNumReduceTasks(1); // one to get final "first" and "last" key + FileOutputFormat.setOutputPath(job, new Path(job.getJobName())); + LOG.info("Started " + job.getJobName()); + + RunningJob runningJob = JobClient.runJob(job); + runningJob.waitForCompletion(); + assertTrue(runningJob.isSuccessful()); + LOG.info("After map/reduce completion - job " + jobName); + } + + public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper + implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> { + + @Override + public void map(ImmutableBytesWritable key, Result value, + OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector, + Reporter reporter) throws IOException { + makeAssertions(key, value); + outputCollector.collect(key, key); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + } + + @Override + public void configure(JobConf jobConf) { + + } + } + + public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements + org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable, + NullWritable, NullWritable> { + + private JobConf jobConf; + + @Override + public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values, + OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter) + throws IOException { + makeAssertions(key, Lists.newArrayList(values)); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + super.cleanup(this.jobConf); + } + + @Override + public void configure(JobConf jobConf) { + this.jobConf = jobConf; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java new file mode 100644 index 0000000..4ebd8bf --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java @@ -0,0 +1,163 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestRowCounter { + + @Test + @SuppressWarnings("deprecation") + public void shouldPrintUsage() throws Exception { + String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]"; + String result = new OutputReader(System.out) { + @Override + void doRead() { + assertEquals(-1, RowCounter.printUsage()); + } + }.read(); + + assertTrue(result.startsWith(expectedOutput)); + } + + @Test + @SuppressWarnings("deprecation") + public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree() + throws Exception { + final String[] args = new String[] { "one", "two" }; + String line = "ERROR: Wrong number of parameters: " + args.length; + String result = new OutputReader(System.err) { + @Override + void doRead() throws Exception { + assertEquals(-1, new RowCounter().run(args)); + } + }.read(); + + assertTrue(result.startsWith(line)); + } + + @Test + @SuppressWarnings({ "deprecation", "unchecked" }) + public void shouldRegInReportEveryIncomingRow() throws IOException { + int iterationNumber = 999; + RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper(); + Reporter reporter = mock(Reporter.class); + for (int i = 0; i < iterationNumber; i++) + mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class), + mock(OutputCollector.class), reporter); + + Mockito.verify(reporter, times(iterationNumber)).incrCounter( + any(Enum.class), anyInt()); + } + + @Test + @SuppressWarnings({ "deprecation" }) + public void shouldCreateAndRunSubmittableJob() throws Exception { + RowCounter rCounter = new RowCounter(); + rCounter.setConf(HBaseConfiguration.create()); + String[] args = new String[] { "\temp", "tableA", "column1", "column2", + "column3" }; + JobConf jobConfig = rCounter.createSubmittableJob(args); + + assertNotNull(jobConfig); + assertEquals(0, jobConfig.getNumReduceTasks()); + assertEquals("rowcounter", jobConfig.getJobName()); + assertEquals(jobConfig.getMapOutputValueClass(), Result.class); + assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class); + assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ') + .join("column1", "column2", "column3")); + assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class); + } + + enum Outs { + OUT, ERR + } + + private static abstract class OutputReader { + private final PrintStream ps; + private PrintStream oldPrintStream; + private Outs outs; + + protected OutputReader(PrintStream ps) { + this.ps = ps; + } + + protected String read() throws Exception { + ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); + if (ps == System.out) { + oldPrintStream = System.out; + outs = Outs.OUT; + System.setOut(new PrintStream(outBytes)); + } else if (ps == System.err) { + oldPrintStream = System.err; + outs = Outs.ERR; + System.setErr(new PrintStream(outBytes)); + } else { + throw new IllegalStateException("OutputReader: unsupported PrintStream"); + } + + try { + doRead(); + return new String(outBytes.toByteArray()); + } finally { + switch (outs) { + case OUT: { + System.setOut(oldPrintStream); + break; + } + case ERR: { + System.setErr(oldPrintStream); + break; + } + default: + throw new IllegalStateException( + "OutputReader: unsupported PrintStream"); + } + } + } + + abstract void doRead() throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java new file mode 100644 index 0000000..2655ac2 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java @@ -0,0 +1,116 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({MapReduceTests.class, SmallTests.class}) +public class TestSplitTable { + @Rule + public TestName name = new TestName(); + + @Test + @SuppressWarnings("deprecation") + public void testSplitTableCompareTo() { + TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA"); + + TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA"); + + TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"), + Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA"); + + assertTrue(aTableSplit.compareTo(aTableSplit) == 0); + assertTrue(bTableSplit.compareTo(bTableSplit) == 0); + assertTrue(cTableSplit.compareTo(cTableSplit) == 0); + + assertTrue(aTableSplit.compareTo(bTableSplit) < 0); + assertTrue(bTableSplit.compareTo(aTableSplit) > 0); + + assertTrue(aTableSplit.compareTo(cTableSplit) < 0); + assertTrue(cTableSplit.compareTo(aTableSplit) > 0); + + assertTrue(bTableSplit.compareTo(cTableSplit) < 0); + assertTrue(cTableSplit.compareTo(bTableSplit) > 0); + + assertTrue(cTableSplit.compareTo(aTableSplit) > 0); + } + + @Test + @SuppressWarnings("deprecation") + public void testSplitTableEquals() { + byte[] tableA = Bytes.toBytes("tableA"); + byte[] aaa = Bytes.toBytes("aaa"); + byte[] ddd = Bytes.toBytes("ddd"); + String locationA = "locationA"; + + TableSplit tablesplit = new TableSplit(tableA, aaa, ddd, locationA); + + TableSplit tableB = new TableSplit(Bytes.toBytes("tableB"), aaa, ddd, locationA); + assertNotEquals(tablesplit.hashCode(), tableB.hashCode()); + assertNotEquals(tablesplit, tableB); + + TableSplit startBbb = new TableSplit(tableA, Bytes.toBytes("bbb"), ddd, locationA); + assertNotEquals(tablesplit.hashCode(), startBbb.hashCode()); + assertNotEquals(tablesplit, startBbb); + + TableSplit endEee = new TableSplit(tableA, aaa, Bytes.toBytes("eee"), locationA); + assertNotEquals(tablesplit.hashCode(), endEee.hashCode()); + assertNotEquals(tablesplit, endEee); + + TableSplit locationB = new TableSplit(tableA, aaa, ddd, "locationB"); + assertNotEquals(tablesplit.hashCode(), locationB.hashCode()); + assertNotEquals(tablesplit, locationB); + + TableSplit same = new TableSplit(tableA, aaa, ddd, locationA); + assertEquals(tablesplit.hashCode(), same.hashCode()); + assertEquals(tablesplit, same); + } + + @Test + @SuppressWarnings("deprecation") + public void testToString() { + TableSplit split = + new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(), + "location"); + String str = + "HBase table split(table name: " + name.getMethodName() + ", start row: row-start, " + + "end row: row-end, region location: location)"; + Assert.assertEquals(str, split.toString()); + + split = new TableSplit((TableName) null, null, null, null); + str = + "HBase table split(table name: null, start row: null, " + + "end row: null, region location: null)"; + Assert.assertEquals(str, split.toString()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java new file mode 100644 index 0000000..f39a7f5 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java @@ -0,0 +1,460 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * This tests the TableInputFormat and its recovery semantics + */ +@Category({MapReduceTests.class, LargeTests.class}) +public class TestTableInputFormat { + + private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class); + + private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + static final byte[] FAMILY = Bytes.toBytes("family"); + + private static final byte[][] columns = new byte[][] { FAMILY }; + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + LOG.info("before"); + UTIL.ensureSomeRegionServersAvailable(1); + LOG.info("before done"); + } + + /** + * Setup a table with two rows and values. + * + * @param tableName + * @return + * @throws IOException + */ + public static Table createTable(byte[] tableName) throws IOException { + return createTable(tableName, new byte[][] { FAMILY }); + } + + /** + * Setup a table with two rows and values per column family. + * + * @param tableName + * @return + * @throws IOException + */ + public static Table createTable(byte[] tableName, byte[][] families) throws IOException { + Table table = UTIL.createTable(TableName.valueOf(tableName), families); + Put p = new Put("aaa".getBytes()); + for (byte[] family : families) { + p.addColumn(family, null, "value aaa".getBytes()); + } + table.put(p); + p = new Put("bbb".getBytes()); + for (byte[] family : families) { + p.addColumn(family, null, "value bbb".getBytes()); + } + table.put(p); + return table; + } + + /** + * Verify that the result and key have expected values. + * + * @param r + * @param key + * @param expectedKey + * @param expectedValue + * @return + */ + static boolean checkResult(Result r, ImmutableBytesWritable key, + byte[] expectedKey, byte[] expectedValue) { + assertEquals(0, key.compareTo(expectedKey)); + Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY); + byte[] value = vals.values().iterator().next(); + assertTrue(Arrays.equals(value, expectedValue)); + return true; // if succeed + } + + /** + * Create table data and run tests on specified htable using the + * o.a.h.hbase.mapred API. + * + * @param table + * @throws IOException + */ + static void runTestMapred(Table table) throws IOException { + org.apache.hadoop.hbase.mapred.TableRecordReader trr = + new org.apache.hadoop.hbase.mapred.TableRecordReader(); + trr.setStartRow("aaa".getBytes()); + trr.setEndRow("zzz".getBytes()); + trr.setHTable(table); + trr.setInputColumns(columns); + + trr.init(); + Result r = new Result(); + ImmutableBytesWritable key = new ImmutableBytesWritable(); + + boolean more = trr.next(key, r); + assertTrue(more); + checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes()); + + more = trr.next(key, r); + assertTrue(more); + checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes()); + + // no more data + more = trr.next(key, r); + assertFalse(more); + } + + /** + * Create a table that IOE's on first scanner next call + * + * @throws IOException + */ + static Table createIOEScannerTable(byte[] name, final int failCnt) + throws IOException { + // build up a mock scanner stuff to fail the first time + Answer<ResultScanner> a = new Answer<ResultScanner>() { + int cnt = 0; + + @Override + public ResultScanner answer(InvocationOnMock invocation) throws Throwable { + // first invocation return the busted mock scanner + if (cnt++ < failCnt) { + // create mock ResultScanner that always fails. + Scan scan = mock(Scan.class); + doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe + ResultScanner scanner = mock(ResultScanner.class); + // simulate TimeoutException / IOException + doThrow(new IOException("Injected exception")).when(scanner).next(); + return scanner; + } + + // otherwise return the real scanner. + return (ResultScanner) invocation.callRealMethod(); + } + }; + + Table htable = spy(createTable(name)); + doAnswer(a).when(htable).getScanner((Scan) anyObject()); + return htable; + } + + /** + * Create a table that throws a DoNoRetryIOException on first scanner next + * call + * + * @throws IOException + */ + static Table createDNRIOEScannerTable(byte[] name, final int failCnt) + throws IOException { + // build up a mock scanner stuff to fail the first time + Answer<ResultScanner> a = new Answer<ResultScanner>() { + int cnt = 0; + + @Override + public ResultScanner answer(InvocationOnMock invocation) throws Throwable { + // first invocation return the busted mock scanner + if (cnt++ < failCnt) { + // create mock ResultScanner that always fails. + Scan scan = mock(Scan.class); + doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe + ResultScanner scanner = mock(ResultScanner.class); + + invocation.callRealMethod(); // simulate NotServingRegionException + doThrow( + new NotServingRegionException("Injected simulated TimeoutException")) + .when(scanner).next(); + return scanner; + } + + // otherwise return the real scanner. + return (ResultScanner) invocation.callRealMethod(); + } + }; + + Table htable = spy(createTable(name)); + doAnswer(a).when(htable).getScanner((Scan) anyObject()); + return htable; + } + + /** + * Run test assuming no errors using mapred api. + * + * @throws IOException + */ + @Test + public void testTableRecordReader() throws IOException { + Table table = createTable("table1".getBytes()); + runTestMapred(table); + } + + /** + * Run test assuming Scanner IOException failure using mapred api, + * + * @throws IOException + */ + @Test + public void testTableRecordReaderScannerFail() throws IOException { + Table htable = createIOEScannerTable("table2".getBytes(), 1); + runTestMapred(htable); + } + + /** + * Run test assuming Scanner IOException failure using mapred api, + * + * @throws IOException + */ + @Test(expected = IOException.class) + public void testTableRecordReaderScannerFailTwice() throws IOException { + Table htable = createIOEScannerTable("table3".getBytes(), 2); + runTestMapred(htable); + } + + /** + * Run test assuming NotServingRegionException using mapred api. + * + * @throws org.apache.hadoop.hbase.DoNotRetryIOException + */ + @Test + public void testTableRecordReaderScannerTimeout() throws IOException { + Table htable = createDNRIOEScannerTable("table4".getBytes(), 1); + runTestMapred(htable); + } + + /** + * Run test assuming NotServingRegionException using mapred api. + * + * @throws org.apache.hadoop.hbase.DoNotRetryIOException + */ + @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class) + public void testTableRecordReaderScannerTimeoutTwice() throws IOException { + Table htable = createDNRIOEScannerTable("table5".getBytes(), 2); + runTestMapred(htable); + } + + /** + * Verify the example we present in javadocs on TableInputFormatBase + */ + @Test + public void testExtensionOfTableInputFormatBase() throws IOException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase"); + final Table table = createTable(Bytes.toBytes("exampleTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleTIF.class); + } + + @Test + public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "as it was given in 0.98."); + final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleDeprecatedTIF.class); + } + + @Test + public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException { + LOG.info("testing use of an InputFormat taht extends InputFormatBase, " + + "using JobConfigurable."); + final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"), + new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") }); + testInputFormat(ExampleJobConfigurableTIF.class); + } + + void testInputFormat(Class<? extends InputFormat> clazz) throws IOException { + Configuration conf = UTIL.getConfiguration(); + final JobConf job = new JobConf(conf); + job.setInputFormat(clazz); + job.setOutputFormat(NullOutputFormat.class); + job.setMapperClass(ExampleVerifier.class); + job.setNumReduceTasks(0); + LOG.debug("submitting job."); + final RunningJob run = JobClient.runJob(job); + assertTrue("job failed!", run.isSuccessful()); + assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter()); + assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter()); + assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter()); + assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter()); + assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter()); + assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters() + .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter()); + } + + public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> { + + @Override + public void configure(JobConf conf) { + } + + @Override + public void map(ImmutableBytesWritable key, Result value, + OutputCollector<NullWritable,NullWritable> output, + Reporter reporter) throws IOException { + for (Cell cell : value.listCells()) { + reporter.getCounter(TestTableInputFormat.class.getName() + ":row", + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) + .increment(1l); + reporter.getCounter(TestTableInputFormat.class.getName() + ":family", + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) + .increment(1l); + reporter.getCounter(TestTableInputFormat.class.getName() + ":value", + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())) + .increment(1l); + } + } + + @Override + public void close() { + } + + } + + public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable { + + @Override + public void configure(JobConf job) { + try { + Connection connection = ConnectionFactory.createConnection(job); + Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable")); + // mandatory + initializeTable(connection, exampleTable.getName()); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + // mandatory + setInputColumns(inputColumns); + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + // optional + setRowFilter(exampleFilter); + } catch (IOException exception) { + throw new RuntimeException("Failed to configure for job.", exception); + } + } + + } + + public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable { + + @Override + public void configure(JobConf job) { + try { + initialize(job); + } catch (IOException exception) { + throw new RuntimeException("Failed to initialize.", exception); + } + } + + @Override + protected void initialize(JobConf job) throws IOException { + initialize(job, "exampleJobConfigurableTable"); + } + } + + + public static class ExampleTIF extends TableInputFormatBase { + + @Override + protected void initialize(JobConf job) throws IOException { + initialize(job, "exampleTable"); + } + + protected void initialize(JobConf job, String table) throws IOException { + Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job)); + TableName tableName = TableName.valueOf(table); + // mandatory + initializeTable(connection, tableName); + byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"), + Bytes.toBytes("columnB") }; + // mandatory + setInputColumns(inputColumns); + Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*")); + // optional + setRowFilter(exampleFilter); + } + + } + +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java new file mode 100644 index 0000000..3f905cf --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.junit.experimental.categories.Category; + +/** + * Test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of + * a particular cell, and write it back to the table. + */ +@Category({MapReduceTests.class, LargeTests.class}) +@SuppressWarnings("deprecation") +public class TestTableMapReduce extends TestTableMapReduceBase { + private static final Log LOG = + LogFactory.getLog(TestTableMapReduce.class.getName()); + + protected Log getLog() { return LOG; } + + /** + * Pass the given key and processed record reduce + */ + static class ProcessContentsMapper extends MapReduceBase implements + TableMap<ImmutableBytesWritable, Put> { + + /** + * Pass the key, and reversed value to reduce + */ + public void map(ImmutableBytesWritable key, Result value, + OutputCollector<ImmutableBytesWritable, Put> output, + Reporter reporter) + throws IOException { + output.collect(key, TestTableMapReduceBase.map(key, value)); + } + } + + @Override + protected void runTestOnTable(Table table) throws IOException { + JobConf jobConf = null; + try { + LOG.info("Before map/reduce startup"); + jobConf = new JobConf(UTIL.getConfiguration(), TestTableMapReduce.class); + jobConf.setJobName("process column contents"); + jobConf.setNumReduceTasks(1); + TableMapReduceUtil.initTableMapJob(table.getName().getNameAsString(), + Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class, + ImmutableBytesWritable.class, Put.class, jobConf); + TableMapReduceUtil.initTableReduceJob(table.getName().getNameAsString(), + IdentityTableReduce.class, jobConf); + + LOG.info("Started " + table.getName()); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); + LOG.info("After map/reduce completion"); + + // verify map-reduce results + verify(table.getName()); + } finally { + if (jobConf != null) { + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java new file mode 100644 index 0000000..ac2f20d --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java @@ -0,0 +1,272 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapred; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; + +@Category({MapReduceTests.class, LargeTests.class}) +public class TestTableMapReduceUtil { + + private static final Log LOG = LogFactory + .getLog(TestTableMapReduceUtil.class); + + private static Table presidentsTable; + private static final String TABLE_NAME = "People"; + + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info"); + private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name"); + + private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of( + "president1", "president2", "president3"); + private static Iterator<String> presidentNames = ImmutableSet.of( + "John F. Kennedy", "George W. Bush", "Barack Obama").iterator(); + + private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1", + "actor2"); + private static Iterator<String> actorNames = ImmutableSet.of( + "Jack Nicholson", "Martin Freeman").iterator(); + + private static String PRESIDENT_PATTERN = "president"; + private static String ACTOR_PATTERN = "actor"; + private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap + .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL.startMiniCluster(); + presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME)); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void before() throws IOException { + LOG.info("before"); + UTIL.ensureSomeRegionServersAvailable(1); + LOG.info("before done"); + } + + public static Table createAndFillTable(TableName tableName) throws IOException { + Table table = UTIL.createTable(tableName, COLUMN_FAMILY); + createPutCommand(table); + return table; + } + + private static void createPutCommand(Table table) throws IOException { + for (String president : presidentsRowKeys) { + if (presidentNames.hasNext()) { + Put p = new Put(Bytes.toBytes(president)); + p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next())); + table.put(p); + } + } + + for (String actor : actorsRowKeys) { + if (actorNames.hasNext()) { + Put p = new Put(Bytes.toBytes(actor)); + p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next())); + table.put(p); + } + } + } + + /** + * Check what the given number of reduce tasks for the given job configuration + * does not exceed the number of regions for the given table. + */ + @Test + public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable() + throws IOException { + Assert.assertNotNull(presidentsTable); + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.setScannerCaching(jobConf, 100); + assertEquals(1, jobConf.getNumReduceTasks()); + assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0)); + + jobConf.setNumReduceTasks(10); + TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumReduceTasks()); + } + + @Test + public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable() + throws IOException { + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumMapTasks()); + + jobConf.setNumMapTasks(10); + TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf); + TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf); + assertEquals(1, jobConf.getNumMapTasks()); + } + + @Test + @SuppressWarnings("deprecation") + public void shoudBeValidMapReduceEvaluation() throws Exception { + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + try { + jobConf.setJobName("process row task"); + jobConf.setNumReduceTasks(1); + TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), + ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, + jobConf); + TableMapReduceUtil.initTableReduceJob(TABLE_NAME, + ClassificatorRowReduce.class, jobConf); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); + } finally { + if (jobConf != null) + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } + } + + @Test + @SuppressWarnings("deprecation") + public void shoudBeValidMapReduceWithPartitionerEvaluation() + throws IOException { + Configuration cfg = UTIL.getConfiguration(); + JobConf jobConf = new JobConf(cfg); + try { + jobConf.setJobName("process row task"); + jobConf.setNumReduceTasks(2); + TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY), + ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class, + jobConf); + + TableMapReduceUtil.initTableReduceJob(TABLE_NAME, + ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class); + RunningJob job = JobClient.runJob(jobConf); + assertTrue(job.isSuccessful()); + } finally { + if (jobConf != null) + FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); + } + } + + @SuppressWarnings("deprecation") + static class ClassificatorRowReduce extends MapReduceBase implements + TableReduce<ImmutableBytesWritable, Put> { + + @Override + public void reduce(ImmutableBytesWritable key, Iterator<Put> values, + OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter) + throws IOException { + String strKey = Bytes.toString(key.get()); + List<Put> result = new ArrayList<>(); + while (values.hasNext()) + result.add(values.next()); + + if (relation.keySet().contains(strKey)) { + Set<String> set = relation.get(strKey); + if (set != null) { + assertEquals(set.size(), result.size()); + } else { + throwAccertionError("Test infrastructure error: set is null"); + } + } else { + throwAccertionError("Test infrastructure error: key not found in map"); + } + } + + private void throwAccertionError(String errorMessage) throws AssertionError { + throw new AssertionError(errorMessage); + } + } + + @SuppressWarnings("deprecation") + static class ClassificatorMapper extends MapReduceBase implements + TableMap<ImmutableBytesWritable, Put> { + + @Override + public void map(ImmutableBytesWritable row, Result result, + OutputCollector<ImmutableBytesWritable, Put> outCollector, + Reporter reporter) throws IOException { + String rowKey = Bytes.toString(result.getRow()); + final ImmutableBytesWritable pKey = new ImmutableBytesWritable( + Bytes.toBytes(PRESIDENT_PATTERN)); + final ImmutableBytesWritable aKey = new ImmutableBytesWritable( + Bytes.toBytes(ACTOR_PATTERN)); + ImmutableBytesWritable outKey = null; + + if (rowKey.startsWith(PRESIDENT_PATTERN)) { + outKey = pKey; + } else if (rowKey.startsWith(ACTOR_PATTERN)) { + outKey = aKey; + } else { + throw new AssertionError("unexpected rowKey"); + } + + String name = Bytes.toString(result.getValue(COLUMN_FAMILY, + COLUMN_QUALIFIER)); + outCollector.collect(outKey, + new Put(Bytes.toBytes("rowKey2")) + .addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name))); + } + } +}
