Repository: hbase
Updated Branches:
  refs/heads/master d90f0571e -> 86ca09e0e


HBASE-15773 Improvements to CellCounter job


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/86ca09e0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/86ca09e0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/86ca09e0

Branch: refs/heads/master
Commit: 86ca09e0e581b897582124938004e948fe38df3b
Parents: d90f057
Author: Gary Helmling <ga...@apache.org>
Authored: Thu May 5 12:40:47 2016 -0700
Committer: Gary Helmling <ga...@apache.org>
Committed: Fri May 6 11:08:18 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/mapreduce/CellCounter.java     | 117 ++++++++++++-------
 .../hbase/mapreduce/TableInputFormat.java       |  88 ++++++++------
 2 files changed, 125 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/86ca09e0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
index aaa32bd..73f9b93 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCounter.java
@@ -92,7 +92,30 @@ public class CellCounter extends Configured implements Tool {
      * Counter enumeration to count the actual rows.
      */
     public static enum Counters {
-      ROWS
+      ROWS,
+      CELLS
+    }
+
+    private Configuration conf;
+    private String separator;
+
+    // state of current row, family, column needs to persist across map() 
invocations
+    // in order to properly handle scanner batching, where a single qualifier 
may have too
+    // many versions for a single map() call
+    private byte[] lastRow;
+    private String currentRowKey;
+    byte[] currentFamily = null;
+    String currentFamilyName = null;
+    byte[] currentQualifier = null;
+    // family + qualifier
+    String currentQualifierName = null;
+    // rowkey + family + qualifier
+    String currentRowQualifierName = null;
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+      conf = context.getConfiguration();
+      separator = conf.get("ReportSeparator",":");
     }
 
     /**
@@ -112,49 +135,45 @@ public class CellCounter extends Configured implements 
Tool {
         throws IOException {
       Preconditions.checkState(values != null,
           "values passed to the map is null");
-      String currentFamilyName = null;
-      String currentQualifierName = null;
-      String currentRowKey = null;
-      Configuration config = context.getConfiguration();
-      String separator = config.get("ReportSeparator",":");
+
       try {
-        context.getCounter(Counters.ROWS).increment(1);
-        context.write(new Text("Total ROWS"), new IntWritable(1));
-        if (values != null && !values.isEmpty()) {
+        byte[] currentRow = values.getRow();
+        if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
+          lastRow = currentRow;
+          currentRowKey = Bytes.toStringBinary(currentRow);
+          currentFamily = null;
+          currentQualifier = null;
+          context.getCounter(Counters.ROWS).increment(1);
+          context.write(new Text("Total ROWS"), new IntWritable(1));
+        }
+        if (!values.isEmpty()) {
+          int cellCount = 0;
           for (Cell value : values.listCells()) {
-            currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
-            String thisRowFamilyName = 
Bytes.toStringBinary(CellUtil.cloneFamily(value));
-            if (!thisRowFamilyName.equals(currentFamilyName)) {
-              currentFamilyName = thisRowFamilyName;
-              context.getCounter("CF", thisRowFamilyName).increment(1);
-              if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) 
{
+            cellCount++;
+            if (currentFamily == null || !CellUtil.matchingFamily(value, 
currentFamily)) {
+              currentFamily = CellUtil.cloneFamily(value);
+              currentFamilyName = Bytes.toStringBinary(currentFamily);
+              currentQualifier = null;
+              context.getCounter("CF", currentFamilyName).increment(1);
+              if (1 == context.getCounter("CF", currentFamilyName).getValue()) 
{
                 context.write(new Text("Total Families Across all Rows"), new 
IntWritable(1));
-                context.write(new Text(thisRowFamilyName), new IntWritable(1));
+                context.write(new Text(currentFamily), new IntWritable(1));
               }
             }
-            String thisRowQualifierName = thisRowFamilyName + separator
-                + Bytes.toStringBinary(CellUtil.cloneQualifier(value));
-            if (!thisRowQualifierName.equals(currentQualifierName)) {
-              currentQualifierName = thisRowQualifierName;
-              context.getCounter("CFQL", thisRowQualifierName).increment(1);
+            if (currentQualifier == null || !CellUtil.matchingQualifier(value, 
currentQualifier)) {
+              currentQualifier = CellUtil.cloneQualifier(value);
+              currentQualifierName = currentFamilyName + separator +
+                  Bytes.toStringBinary(currentQualifier);
+              currentRowQualifierName = currentRowKey + separator + 
currentQualifierName;
+
               context.write(new Text("Total Qualifiers across all Rows"),
                   new IntWritable(1));
-              context.write(new Text(thisRowQualifierName), new 
IntWritable(1));
-              // Intialize versions
-              context.getCounter("QL_VERSIONS", currentRowKey + separator +
-                  thisRowQualifierName).increment(1);
-              context.write(new Text(currentRowKey + separator
-                  + thisRowQualifierName + "_Versions"), new IntWritable(1));
-
-            } else {
-              // Increment versions
-              currentQualifierName = thisRowQualifierName;
-              context.getCounter("QL_VERSIONS", currentRowKey + separator +
-                  thisRowQualifierName).increment(1);
-              context.write(new Text(currentRowKey + separator
-                  + thisRowQualifierName + "_Versions"), new IntWritable(1));
+              context.write(new Text(currentQualifierName), new 
IntWritable(1));
             }
+            // Increment versions
+            context.write(new Text(currentRowQualifierName + "_Versions"), new 
IntWritable(1));
           }
+          context.getCounter(Counters.CELLS).increment(cellCount);
         }
       } catch (InterruptedException e) {
         e.printStackTrace();
@@ -208,15 +227,16 @@ public class CellCounter extends Configured implements 
Tool {
     return job;
   }
 
-  private static Scan getConfiguredScanForJob(Configuration conf, String[] 
args) throws IOException {
-    Scan s = new Scan();
+  private static Scan getConfiguredScanForJob(Configuration conf, String[] 
args)
+      throws IOException {
+    // create scan with any properties set from TableInputFormat
+    Scan s = TableInputFormat.createScanFromConfiguration(conf);
     // Set Scan Versions
-    s.setMaxVersions(Integer.MAX_VALUE);
-    s.setCacheBlocks(false);
-    // Set Scan Column Family
-    if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
-      
s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
+    if (conf.get(TableInputFormat.SCAN_MAXVERSIONS) == null) {
+      // default to all versions unless explicitly set
+      s.setMaxVersions(Integer.MAX_VALUE);
     }
+    s.setCacheBlocks(false);
     // Set RowFilter or Prefix Filter if applicable.
     Filter rowFilter = getRowFilter(args);
     if (rowFilter!= null) {
@@ -277,9 +297,18 @@ public class CellCounter extends Configured implements 
Tool {
       System.err.println("       <tablename> <outputDir> <reportSeparator> 
[^[regex pattern] or " +
         "[Prefix] for row filter]] --starttime=[starttime] 
--endtime=[endtime]");
       System.err.println("  Note: -D properties will be applied to the conf 
used. ");
-      System.err.println("  Additionally, the following SCAN properties can be 
specified");
-      System.err.println("  to get fine grained control on what is counted..");
+      System.err.println("  Additionally, all of the SCAN properties from 
TableInputFormat");
+      System.err.println("  can be specified to get fine grained control on 
what is counted..");
+      System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + 
"=<rowkey>");
+      System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + 
"=<rowkey>");
+      System.err.println("   -D " + TableInputFormat.SCAN_COLUMNS + "=\"<col1> 
<col2>...\"");
       System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + 
"=<familyName>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMESTAMP + 
"=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_START + 
"=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_TIMERANGE_END + 
"=<timestamp>");
+      System.err.println("   -D " + TableInputFormat.SCAN_MAXVERSIONS + 
"=<count>");
+      System.err.println("   -D " + TableInputFormat.SCAN_CACHEDROWS + 
"=<count>");
+      System.err.println("   -D " + TableInputFormat.SCAN_BATCHSIZE + 
"=<count>");
       System.err.println(" <reportSeparator> parameter can be used to override 
the default report separator " +
           "string : used to separate the rowId/column family name and 
qualifier name.");
       System.err.println(" [^[regex pattern] or [Prefix] parameter can be used 
to limit the cell counter count " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/86ca09e0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
index ebeb158..be20d90 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
@@ -126,54 +126,70 @@ implements Configurable {
       }
     } else {
       try {
-        scan = new Scan();
+        scan = createScanFromConfiguration(conf);
+      } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+      }
+    }
 
-        if (conf.get(SCAN_ROW_START) != null) {
-          scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
-        }
+    setScan(scan);
+  }
 
-        if (conf.get(SCAN_ROW_STOP) != null) {
-          scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
-        }
+  /**
+   * Sets up a {@link Scan} instance, applying settings from the configuration 
property
+   * constants defined in {@code TableInputFormat}.  This allows specifying 
things such as:
+   * <ul>
+   *   <li>start and stop rows</li>
+   *   <li>column qualifiers or families</li>
+   *   <li>timestamps or timerange</li>
+   *   <li>scanner caching and batch size</li>
+   * </ul>
+   */
+  public static Scan createScanFromConfiguration(Configuration conf) throws 
IOException {
+    Scan scan = new Scan();
 
-        if (conf.get(SCAN_COLUMNS) != null) {
-          addColumns(scan, conf.get(SCAN_COLUMNS));
-        }
+    if (conf.get(SCAN_ROW_START) != null) {
+      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
+    }
 
-        if (conf.get(SCAN_COLUMN_FAMILY) != null) {
-          scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
-        }
+    if (conf.get(SCAN_ROW_STOP) != null) {
+      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
+    }
 
-        if (conf.get(SCAN_TIMESTAMP) != null) {
-          scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
-        }
+    if (conf.get(SCAN_COLUMNS) != null) {
+      addColumns(scan, conf.get(SCAN_COLUMNS));
+    }
 
-        if (conf.get(SCAN_TIMERANGE_START) != null && 
conf.get(SCAN_TIMERANGE_END) != null) {
-          scan.setTimeRange(
-              Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
-              Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
-        }
+    if (conf.get(SCAN_COLUMN_FAMILY) != null) {
+      scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
+    }
 
-        if (conf.get(SCAN_MAXVERSIONS) != null) {
-          scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
-        }
+    if (conf.get(SCAN_TIMESTAMP) != null) {
+      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
+    }
 
-        if (conf.get(SCAN_CACHEDROWS) != null) {
-          scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
-        }
+    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) 
!= null) {
+      scan.setTimeRange(
+          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
+          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
+    }
 
-        if (conf.get(SCAN_BATCHSIZE) != null) {
-          scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
-        }
+    if (conf.get(SCAN_MAXVERSIONS) != null) {
+      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
+    }
 
-        // false by default, full table scans generate too much BC churn
-        scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
-      } catch (Exception e) {
-          LOG.error(StringUtils.stringifyException(e));
-      }
+    if (conf.get(SCAN_CACHEDROWS) != null) {
+      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
     }
 
-    setScan(scan);
+    if (conf.get(SCAN_BATCHSIZE) != null) {
+      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
+    }
+
+    // false by default, full table scans generate too much BC churn
+    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
+
+    return scan;
   }
 
   @Override

Reply via email to