Repository: hbase Updated Branches: refs/heads/master d90f0571e -> 86ca09e0e
HBASE-15773 Improvements to CellCounter job Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/86ca09e0 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/86ca09e0 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/86ca09e0 Branch: refs/heads/master Commit: 86ca09e0e581b897582124938004e948fe38df3b Parents: d90f057 Author: Gary Helmling <ga...@apache.org> Authored: Thu May 5 12:40:47 2016 -0700 Committer: Gary Helmling <ga...@apache.org> Committed: Fri May 6 11:08:18 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/mapreduce/CellCounter.java | 117 ++++++++++++------- .../hbase/mapreduce/TableInputFormat.java | 88 ++++++++------ 2 files changed, 125 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/86ca09e0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java index aaa32bd..73f9b93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java @@ -92,7 +92,30 @@ public class CellCounter extends Configured implements Tool { * Counter enumeration to count the actual rows. */ public static enum Counters { - ROWS + ROWS, + CELLS + } + + private Configuration conf; + private String separator; + + // state of current row, family, column needs to persist across map() invocations + // in order to properly handle scanner batching, where a single qualifier may have too + // many versions for a single map() call + private byte[] lastRow; + private String currentRowKey; + byte[] currentFamily = null; + String currentFamilyName = null; + byte[] currentQualifier = null; + // family + qualifier + String currentQualifierName = null; + // rowkey + family + qualifier + String currentRowQualifierName = null; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + conf = context.getConfiguration(); + separator = conf.get("ReportSeparator",":"); } /** @@ -112,49 +135,45 @@ public class CellCounter extends Configured implements Tool { throws IOException { Preconditions.checkState(values != null, "values passed to the map is null"); - String currentFamilyName = null; - String currentQualifierName = null; - String currentRowKey = null; - Configuration config = context.getConfiguration(); - String separator = config.get("ReportSeparator",":"); + try { - context.getCounter(Counters.ROWS).increment(1); - context.write(new Text("Total ROWS"), new IntWritable(1)); - if (values != null && !values.isEmpty()) { + byte[] currentRow = values.getRow(); + if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { + lastRow = currentRow; + currentRowKey = Bytes.toStringBinary(currentRow); + currentFamily = null; + currentQualifier = null; + context.getCounter(Counters.ROWS).increment(1); + context.write(new Text("Total ROWS"), new IntWritable(1)); + } + if (!values.isEmpty()) { + int cellCount = 0; for (Cell value : values.listCells()) { - currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value)); - String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value)); - if (!thisRowFamilyName.equals(currentFamilyName)) { - currentFamilyName = thisRowFamilyName; - context.getCounter("CF", thisRowFamilyName).increment(1); - if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) { + cellCount++; + if (currentFamily == null || !CellUtil.matchingFamily(value, currentFamily)) { + currentFamily = CellUtil.cloneFamily(value); + currentFamilyName = Bytes.toStringBinary(currentFamily); + currentQualifier = null; + context.getCounter("CF", currentFamilyName).increment(1); + if (1 == context.getCounter("CF", currentFamilyName).getValue()) { context.write(new Text("Total Families Across all Rows"), new IntWritable(1)); - context.write(new Text(thisRowFamilyName), new IntWritable(1)); + context.write(new Text(currentFamily), new IntWritable(1)); } } - String thisRowQualifierName = thisRowFamilyName + separator - + Bytes.toStringBinary(CellUtil.cloneQualifier(value)); - if (!thisRowQualifierName.equals(currentQualifierName)) { - currentQualifierName = thisRowQualifierName; - context.getCounter("CFQL", thisRowQualifierName).increment(1); + if (currentQualifier == null || !CellUtil.matchingQualifier(value, currentQualifier)) { + currentQualifier = CellUtil.cloneQualifier(value); + currentQualifierName = currentFamilyName + separator + + Bytes.toStringBinary(currentQualifier); + currentRowQualifierName = currentRowKey + separator + currentQualifierName; + context.write(new Text("Total Qualifiers across all Rows"), new IntWritable(1)); - context.write(new Text(thisRowQualifierName), new IntWritable(1)); - // Intialize versions - context.getCounter("QL_VERSIONS", currentRowKey + separator + - thisRowQualifierName).increment(1); - context.write(new Text(currentRowKey + separator - + thisRowQualifierName + "_Versions"), new IntWritable(1)); - - } else { - // Increment versions - currentQualifierName = thisRowQualifierName; - context.getCounter("QL_VERSIONS", currentRowKey + separator + - thisRowQualifierName).increment(1); - context.write(new Text(currentRowKey + separator - + thisRowQualifierName + "_Versions"), new IntWritable(1)); + context.write(new Text(currentQualifierName), new IntWritable(1)); } + // Increment versions + context.write(new Text(currentRowQualifierName + "_Versions"), new IntWritable(1)); } + context.getCounter(Counters.CELLS).increment(cellCount); } } catch (InterruptedException e) { e.printStackTrace(); @@ -208,15 +227,16 @@ public class CellCounter extends Configured implements Tool { return job; } - private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException { - Scan s = new Scan(); + private static Scan getConfiguredScanForJob(Configuration conf, String[] args) + throws IOException { + // create scan with any properties set from TableInputFormat + Scan s = TableInputFormat.createScanFromConfiguration(conf); // Set Scan Versions - s.setMaxVersions(Integer.MAX_VALUE); - s.setCacheBlocks(false); - // Set Scan Column Family - if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) { - s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY))); + if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) { + // default to all versions unless explicitly set + s.setMaxVersions(Integer.MAX_VALUE); } + s.setCacheBlocks(false); // Set RowFilter or Prefix Filter if applicable. Filter rowFilter = getRowFilter(args); if (rowFilter!= null) { @@ -277,9 +297,18 @@ public class CellCounter extends Configured implements Tool { System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " + "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]"); System.err.println(" Note: -D properties will be applied to the conf used. "); - System.err.println(" Additionally, the following SCAN properties can be specified"); - System.err.println(" to get fine grained control on what is counted.."); + System.err.println(" Additionally, all of the SCAN properties from TableInputFormat"); + System.err.println(" can be specified to get fine grained control on what is counted.."); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<rowkey>"); + System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<rowkey>"); + System.err.println(" -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> <col2>...\""); System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>"); + System.err.println(" -D " + TableInputFormat.SCAN_TIMESTAMP + "=<timestamp>"); + System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_START + "=<timestamp>"); + System.err.println(" -D " + TableInputFormat.SCAN_TIMERANGE_END + "=<timestamp>"); + System.err.println(" -D " + TableInputFormat.SCAN_MAXVERSIONS + "=<count>"); + System.err.println(" -D " + TableInputFormat.SCAN_CACHEDROWS + "=<count>"); + System.err.println(" -D " + TableInputFormat.SCAN_BATCHSIZE + "=<count>"); System.err.println(" <reportSeparator> parameter can be used to override the default report separator " + "string : used to separate the rowId/column family name and qualifier name."); System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " + http://git-wip-us.apache.org/repos/asf/hbase/blob/86ca09e0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java index ebeb158..be20d90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java @@ -126,54 +126,70 @@ implements Configurable { } } else { try { - scan = new Scan(); + scan = createScanFromConfiguration(conf); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + } - if (conf.get(SCAN_ROW_START) != null) { - scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START))); - } + setScan(scan); + } - if (conf.get(SCAN_ROW_STOP) != null) { - scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP))); - } + /** + * Sets up a {@link Scan} instance, applying settings from the configuration property + * constants defined in {@code TableInputFormat}. This allows specifying things such as: + * <ul> + * <li>start and stop rows</li> + * <li>column qualifiers or families</li> + * <li>timestamps or timerange</li> + * <li>scanner caching and batch size</li> + * </ul> + */ + public static Scan createScanFromConfiguration(Configuration conf) throws IOException { + Scan scan = new Scan(); - if (conf.get(SCAN_COLUMNS) != null) { - addColumns(scan, conf.get(SCAN_COLUMNS)); - } + if (conf.get(SCAN_ROW_START) != null) { + scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START))); + } - if (conf.get(SCAN_COLUMN_FAMILY) != null) { - scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY))); - } + if (conf.get(SCAN_ROW_STOP) != null) { + scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP))); + } - if (conf.get(SCAN_TIMESTAMP) != null) { - scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); - } + if (conf.get(SCAN_COLUMNS) != null) { + addColumns(scan, conf.get(SCAN_COLUMNS)); + } - if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { - scan.setTimeRange( - Long.parseLong(conf.get(SCAN_TIMERANGE_START)), - Long.parseLong(conf.get(SCAN_TIMERANGE_END))); - } + if (conf.get(SCAN_COLUMN_FAMILY) != null) { + scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY))); + } - if (conf.get(SCAN_MAXVERSIONS) != null) { - scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); - } + if (conf.get(SCAN_TIMESTAMP) != null) { + scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); + } - if (conf.get(SCAN_CACHEDROWS) != null) { - scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); - } + if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) { + scan.setTimeRange( + Long.parseLong(conf.get(SCAN_TIMERANGE_START)), + Long.parseLong(conf.get(SCAN_TIMERANGE_END))); + } - if (conf.get(SCAN_BATCHSIZE) != null) { - scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); - } + if (conf.get(SCAN_MAXVERSIONS) != null) { + scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS))); + } - // false by default, full table scans generate too much BC churn - scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - } + if (conf.get(SCAN_CACHEDROWS) != null) { + scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); } - setScan(scan); + if (conf.get(SCAN_BATCHSIZE) != null) { + scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE))); + } + + // false by default, full table scans generate too much BC churn + scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); + + return scan; } @Override