Repository: hbase Updated Branches: refs/heads/master cb1ef915d -> e1e843434
http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e84343/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java new file mode 100644 index 0000000..6f91515 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestServerSideScanMetricsFromClientSide { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static Table TABLE = null; + + /** + * Table configuration + */ + private static TableName TABLE_NAME = TableName.valueOf("testTable"); + + private static int NUM_ROWS = 10; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + // Should keep this value below 10 to keep generation of expected kv's simple. If above 10 then + // table/row/cf1/... will be followed by table/row/cf10/... instead of table/row/cf2/... which + // breaks the simple generation of expected kv's + private static int NUM_FAMILIES = 1; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 1; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 10; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static int NUM_COLS = NUM_FAMILIES * NUM_QUALIFIERS; + + // Approximation of how large the heap size of cells in our table. Should be accessed through + // getCellHeapSize(). + private static long CELL_HEAP_SIZE = -1; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List<Put> puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Make puts to put the input value into each combination of row, family, and qualifier + * @param rows + * @param families + * @param qualifiers + * @param value + * @return + * @throws IOException + */ + static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList<Put> puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + /** + * @return The approximate heap size of a cell in the test table. All cells should have + * approximately the same heap size, so the value is cached to avoid repeating the + * calculation + * @throws Exception + */ + private long getCellHeapSize() throws Exception { + if (CELL_HEAP_SIZE == -1) { + // Do a partial scan that will return a single result with a single cell + Scan scan = new Scan(); + scan.setMaxResultSize(1); + scan.setAllowPartialResults(true); + ResultScanner scanner = TABLE.getScanner(scan); + + Result result = scanner.next(); + + assertTrue(result != null); + assertTrue(result.rawCells() != null); + assertTrue(result.rawCells().length == 1); + + CELL_HEAP_SIZE = CellUtil.estimatedHeapSizeOf(result.rawCells()[0]); + scanner.close(); + } + + return CELL_HEAP_SIZE; + } + + @Test + public void testRowsSeenMetric() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + testRowsSeenMetric(baseScan); + + // Test case that only a single result will be returned per RPC to the serer + baseScan.setCaching(1); + testRowsSeenMetric(baseScan); + + // Test case that partial results are returned from the server. At most one cell will be + // contained in each response + baseScan.setMaxResultSize(1); + testRowsSeenMetric(baseScan); + + // Test case that size limit is set such that a few cells are returned per partial result from + // the server + baseScan.setCaching(NUM_ROWS); + baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); + testRowsSeenMetric(baseScan); + } + + public void testRowsSeenMetric(Scan baseScan) throws Exception { + Scan scan; + scan = new Scan(baseScan); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, NUM_ROWS); + + for (int i = 0; i < ROWS.length - 1; i++) { + scan = new Scan(baseScan); + scan.setStartRow(ROWS[0]); + scan.setStopRow(ROWS[i + 1]); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, i + 1); + } + + for (int i = ROWS.length - 1; i > 0; i--) { + scan = new Scan(baseScan); + scan.setStartRow(ROWS[i - 1]); + scan.setStopRow(ROWS[ROWS.length - 1]); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length - i); + } + + // The filter should filter out all rows, but we still expect to see every row. + Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes())); + scan = new Scan(baseScan); + scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length); + + // Filter should pass on all rows + SingleColumnValueFilter singleColumnValueFilter = + new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.EQUAL, VALUE); + scan = new Scan(baseScan); + scan.setFilter(singleColumnValueFilter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length); + + // Filter should filter out all rows + singleColumnValueFilter = + new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.NOT_EQUAL, VALUE); + scan = new Scan(baseScan); + scan.setFilter(singleColumnValueFilter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY, ROWS.length); + } + + @Test + public void testRowsFilteredMetric() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + + // Test case where scan uses default values + testRowsFilteredMetric(baseScan); + + // Test case where at most one Result is retrieved per RPC + baseScan.setCaching(1); + testRowsFilteredMetric(baseScan); + + // Test case where size limit is very restrictive and partial results will be returned from + // server + baseScan.setMaxResultSize(1); + testRowsFilteredMetric(baseScan); + + // Test a case where max result size limits response from server to only a few cells (not all + // cells from the row) + baseScan.setCaching(NUM_ROWS); + baseScan.setMaxResultSize(getCellHeapSize() * (NUM_COLS - 1)); + testRowsSeenMetric(baseScan); + } + + public void testRowsFilteredMetric(Scan baseScan) throws Exception { + testRowsFilteredMetric(baseScan, null, 0); + + // Row filter doesn't match any row key. All rows should be filtered + Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator("xyz".getBytes())); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + // Filter will return results containing only the first key. Number of entire rows filtered + // should be 0. + filter = new FirstKeyOnlyFilter(); + testRowsFilteredMetric(baseScan, filter, 0); + + // Column prefix will find some matching qualifier on each row. Number of entire rows filtered + // should be 0 + filter = new ColumnPrefixFilter(QUALIFIERS[0]); + testRowsFilteredMetric(baseScan, filter, 0); + + // Column prefix will NOT find any matching qualifier on any row. All rows should be filtered + filter = new ColumnPrefixFilter("xyz".getBytes()); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + // Matching column value should exist in each row. No rows should be filtered. + filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.EQUAL, VALUE); + testRowsFilteredMetric(baseScan, filter, 0); + + // No matching column value should exist in any row. Filter all rows + filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOp.NOT_EQUAL, VALUE); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + List<Filter> filters = new ArrayList<Filter>(); + filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(ROWS[0]))); + filters.add(new RowFilter(CompareOp.EQUAL, new BinaryComparator(ROWS[3]))); + int numberOfMatchingRowFilters = filters.size(); + filter = new FilterList(Operator.MUST_PASS_ONE, filters); + testRowsFilteredMetric(baseScan, filter, ROWS.length - numberOfMatchingRowFilters); + filters.clear(); + + // Add a single column value exclude filter for each column... The net effect is that all + // columns will be excluded when scanning on the server side. This will result in an empty cell + // array in RegionScanner#nextInternal which should be interpreted as a row being filtered. + for (int family = 0; family < FAMILIES.length; family++) { + for (int qualifier = 0; qualifier < QUALIFIERS.length; qualifier++) { + filters.add(new SingleColumnValueExcludeFilter(FAMILIES[family], QUALIFIERS[qualifier], + CompareOp.EQUAL, VALUE)); + } + } + filter = new FilterList(Operator.MUST_PASS_ONE, filters); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + } + + public void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered) + throws Exception { + Scan scan = new Scan(baseScan); + if (filter != null) scan.setFilter(filter); + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY, expectedNumFiltered); + } + + /** + * Run the scan to completetion and check the metric against the specified value + * @param scan + * @param metricKey + * @param expectedValue + * @throws Exception + */ + public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { + assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); + ResultScanner scanner = TABLE.getScanner(scan); + + // Iterate through all the results + for (Result r : scanner) { + } + scanner.close(); + ScanMetrics metrics = scan.getScanMetrics(); + assertTrue("Metrics are null", metrics != null); + assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); + final long actualMetricValue = metrics.getCounter(metricKey).get(); + assertEquals("Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + + actualMetricValue, expectedValue, actualMetricValue); + + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e84343/hbase-shell/src/main/ruby/hbase.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase.rb b/hbase-shell/src/main/ruby/hbase.rb index f181eda..aca1006 100644 --- a/hbase-shell/src/main/ruby/hbase.rb +++ b/hbase-shell/src/main/ruby/hbase.rb @@ -49,6 +49,8 @@ module HBaseConstants METHOD = "METHOD" MAXLENGTH = "MAXLENGTH" CACHE_BLOCKS = "CACHE_BLOCKS" + ALL_METRICS = "ALL_METRICS" + METRICS = "METRICS" REVERSED = "REVERSED" REPLICATION_SCOPE = "REPLICATION_SCOPE" INTERVAL = 'INTERVAL' http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e84343/hbase-shell/src/main/ruby/hbase/table.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb index bba81d5..960eafc 100644 --- a/hbase-shell/src/main/ruby/hbase/table.rb +++ b/hbase-shell/src/main/ruby/hbase/table.rb @@ -407,6 +407,8 @@ EOF def _hash_to_scan(args) if args.any? + enablemetrics = args["ALL_METRICS"].nil? ? false : args["ALL_METRICS"] + enablemetrics = enablemetrics || !args["METRICS"].nil? filter = args["FILTER"] startrow = args["STARTROW"] || '' stoprow = args["STOPROW"] @@ -454,6 +456,7 @@ EOF scan.setFilter(org.apache.hadoop.hbase.filter.ParseFilter.new.parseFilterString(filter)) end + scan.setScanMetricsEnabled(enablemetrics) if enablemetrics scan.setTimeStamp(timestamp) if timestamp scan.setCacheBlocks(cache_blocks) scan.setReversed(reversed) @@ -478,8 +481,10 @@ EOF #---------------------------------------------------------------------------------------------- # Scans whole table or a range of keys and returns rows matching specific criteria - def _scan_internal(args = {}) - raise(ArgumentError, "Arguments should be a Hash") unless args.kind_of?(Hash) + def _scan_internal(args = {}, scan = nil) + raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash) + raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \ + unless scan == nil || scan.kind_of?(org.apache.hadoop.hbase.client.Scan) limit = args["LIMIT"] || -1 maxlength = args.delete("MAXLENGTH") || -1 @@ -489,7 +494,8 @@ EOF @converters.clear() # Start the scanner - scanner = @table.getScanner(_hash_to_scan(args)) + scan = scan == nil ? _hash_to_scan(args) : scan + scanner = @table.getScanner(scan) iter = scanner.iterator # Iterate results @@ -519,6 +525,7 @@ EOF break end end + scanner.close() return ((block_given?) ? count : res) end http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e84343/hbase-shell/src/main/ruby/shell/commands/scan.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/scan.rb b/hbase-shell/src/main/ruby/shell/commands/scan.rb index c6aba9c..d2fe4e9 100644 --- a/hbase-shell/src/main/ruby/shell/commands/scan.rb +++ b/hbase-shell/src/main/ruby/shell/commands/scan.rb @@ -25,7 +25,7 @@ module Shell Scan a table; pass table name and optionally a dictionary of scanner specifications. Scanner specifications may include one or more of: TIMERANGE, FILTER, LIMIT, STARTROW, STOPROW, ROWPREFIXFILTER, TIMESTAMP, -MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS +MAXLENGTH or COLUMNS, CACHE or RAW, VERSIONS, ALL_METRICS or METRICS If no columns are specified, all columns will be scanned. To scan all members of a column family, leave the qualifier empty as in @@ -36,6 +36,11 @@ The filter can be specified in two ways: Filter Language document attached to the HBASE-4176 JIRA 2. Using the entire package name of the filter. +If you wish to see metrics regarding the execution of the scan, the +ALL_METRICS boolean should be set to true. Alternatively, if you would +prefer to see only a subset of the metrics, the METRICS array can be +defined to include the names of only the metrics you care about. + Some examples: hbase> scan 'hbase:meta' @@ -44,6 +49,8 @@ Some examples: hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'} hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]} hbase> scan 't1', {REVERSED => true} + hbase> scan 't1', {ALL_METRICS => true} + hbase> scan 't1', {METRICS => ['RPC_RETRIES', 'ROWS_FILTERED']} hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => " (QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"} hbase> scan 't1', {FILTER => @@ -100,12 +107,18 @@ EOF now = Time.now formatter.header(["ROW", "COLUMN+CELL"]) + scan = table._hash_to_scan(args) #actually do the scanning - count = table._scan_internal(args) do |row, cells| + count = table._scan_internal(args, scan) do |row, cells| formatter.row([ row, cells ]) end formatter.footer(now, count) + + # if scan metrics were enabled, print them after the results + if (scan != nil && scan.isScanMetricsEnabled()) + formatter.scan_metrics(scan.getScanMetrics(), args["METRICS"]) + end end end end http://git-wip-us.apache.org/repos/asf/hbase/blob/e1e84343/hbase-shell/src/main/ruby/shell/formatter.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/formatter.rb b/hbase-shell/src/main/ruby/shell/formatter.rb index 36aaf76..47c9c8d 100644 --- a/hbase-shell/src/main/ruby/shell/formatter.rb +++ b/hbase-shell/src/main/ruby/shell/formatter.rb @@ -112,6 +112,37 @@ module Shell @row_count += 1 end + # Output the scan metrics. Can be filtered to output only those metrics whose keys exists + # in the metric_filter + def scan_metrics(scan_metrics = nil, metric_filter = []) + return if scan_metrics == nil + raise(ArgumentError, \ + "Argument should be org.apache.hadoop.hbase.client.metrics.ScanMetrics") \ + unless scan_metrics.kind_of?(org.apache.hadoop.hbase.client.metrics.ScanMetrics) + # prefix output with empty line + @out.puts + # save row count to restore after printing metrics + # (metrics should not count towards row count) + saved_row_count = @row_count + iter = scan_metrics.getMetricsMap().entrySet().iterator() + metric_hash = Hash.new() + # put keys in hash so they can be sorted easily + while iter.hasNext + metric = iter.next + metric_hash[metric.getKey.to_s] = metric.getValue.to_s + end + # print in alphabetical order + row(["METRIC", "VALUE"], false) + metric_hash.sort.map do |key, value| + if (not metric_filter or metric_filter.length == 0 or metric_filter.include?(key)) + row([key, value]) + end + end + + @row_count = saved_row_count + return + end + def split(width, str) if width == 0 return [str]
