This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 1147b3c9e21 HBASE-28837 Introduce row statistics coprocessor example 
(#6327)
1147b3c9e21 is described below

commit 1147b3c9e21a5d7dc93c473baea88c4b14250269
Author: eab148 <[email protected]>
AuthorDate: Fri Dec 20 06:57:58 2024 -0500

    HBASE-28837 Introduce row statistics coprocessor example (#6327)
    
    Co-authored-by: Evie Boland <[email protected]>
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 hbase-examples/pom.xml                             |  29 +-
 .../example/row/stats/RowStatistics.java           |  62 ++++
 .../row/stats/RowStatisticsCompactionObserver.java | 319 +++++++++++++++++++++
 .../example/row/stats/RowStatisticsImpl.java       | 300 +++++++++++++++++++
 .../coprocessor/example/row/stats/SizeBucket.java  |  57 ++++
 .../example/row/stats/SizeBucketTracker.java       |  83 ++++++
 .../recorder/RowStatisticsCombinedRecorder.java    |  48 ++++
 .../row/stats/recorder/RowStatisticsRecorder.java  |  27 ++
 .../stats/recorder/RowStatisticsTableRecorder.java | 182 ++++++++++++
 .../RowStatisticsDisruptorExceptionHandler.java    |  51 ++++
 .../ringbuffer/RowStatisticsEventHandler.java      |  62 ++++
 .../RowStatisticsRingBufferEnvelope.java           |  39 +++
 .../ringbuffer/RowStatisticsRingBufferPayload.java |  41 +++
 .../utils/RowStatisticsConfigurationUtil.java      |  39 +++
 .../row/stats/utils/RowStatisticsTableUtil.java    |  50 ++++
 .../example/row/stats/utils/RowStatisticsUtil.java |  36 +++
 .../stats/TestRowStatisticsCompactionObserver.java | 205 +++++++++++++
 .../row/stats/TestRowStatisticsEventHandler.java   |  87 ++++++
 .../row/stats/TestRowStatisticsTableRecorder.java  | 103 +++++++
 .../stats/sizebucket/TestSizeBucketTracker.java    |  93 ++++++
 .../hbase/mapreduce/TestMapReduceExamples.java     |   2 -
 21 files changed, 1905 insertions(+), 10 deletions(-)

diff --git a/hbase-examples/pom.xml b/hbase-examples/pom.xml
index 816e08ad32a..44f6fdff39e 100644
--- a/hbase-examples/pom.xml
+++ b/hbase-examples/pom.xml
@@ -37,10 +37,6 @@
     <internal.protobuf.version>4.28.2</internal.protobuf.version>
   </properties>
   <dependencies>
-    <dependency>
-      <groupId>org.apache.hbase.thirdparty</groupId>
-      <artifactId>hbase-shaded-miscellaneous</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.hbase.thirdparty</groupId>
       <artifactId>hbase-shaded-netty</artifactId>
@@ -49,6 +45,27 @@
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-protocol-shaded</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase.thirdparty</groupId>
+      <artifactId>hbase-shaded-miscellaneous</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-1.2-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-logging</artifactId>
@@ -106,10 +123,6 @@
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java
new file mode 100644
index 00000000000..18e08ab7b33
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import java.util.Map;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public interface RowStatistics {
+  String getTable();
+
+  String getRegion();
+
+  String getColumnFamily();
+
+  boolean isMajor();
+
+  long getLargestRowNumBytes();
+
+  int getLargestRowCellsCount();
+
+  long getLargestCellNumBytes();
+
+  int getCellsLargerThanOneBlockCount();
+
+  int getRowsLargerThanOneBlockCount();
+
+  int getCellsLargerThanMaxCacheSizeCount();
+
+  int getTotalDeletesCount();
+
+  int getTotalCellsCount();
+
+  int getTotalRowsCount();
+
+  long getTotalBytes();
+
+  String getLargestRowAsString();
+
+  String getLargestCellAsString();
+
+  Map<String, Long> getRowSizeBuckets();
+
+  Map<String, Long> getValueSizeBuckets();
+
+  String getJsonString();
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java
new file mode 100644
index 00000000000..78e92aa18ae
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.CF;
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACE;
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACED_TABLE_NAME;
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.TABLE_RECORDER_KEY;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsTableRecorder;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Shipper;
+import org.apache.hadoop.hbase.regionserver.Store;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class RowStatisticsCompactionObserver
+  implements RegionCoprocessor, RegionObserver, MasterCoprocessor, 
MasterObserver {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowStatisticsCompactionObserver.class);
+
+  // From private field BucketAllocator.DEFAULT_BUCKET_SIZES
+  private static final long DEFAULT_MAX_BUCKET_SIZE = 512 * 1024 + 1024;
+  private static final ConcurrentMap<TableName, Long> TABLE_COUNTERS = new 
ConcurrentHashMap();
+  private static final String ROW_STATISTICS_DROPPED = "rowStatisticsDropped";
+  private static final String ROW_STATISTICS_PUT_FAILED = 
"rowStatisticsPutFailures";
+  private Counter rowStatisticsDropped;
+  private Counter rowStatisticsPutFailed;
+  private long maxCacheSize;
+  private final RowStatisticsRecorder recorder;
+
+  @InterfaceAudience.Private
+  public RowStatisticsCompactionObserver(RowStatisticsRecorder recorder) {
+    this.recorder = recorder;
+  }
+
+  public RowStatisticsCompactionObserver() {
+    this(null);
+  }
+
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public Optional<MasterObserver> getMasterObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+    if (!(e instanceof RegionCoprocessorEnvironment)) {
+      return;
+    }
+    RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e;
+    if (regionEnv.getRegionInfo().getTable().isSystemTable()) {
+      return;
+    }
+    String[] configuredBuckets =
+      
regionEnv.getConfiguration().getStrings(BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY);
+    maxCacheSize = DEFAULT_MAX_BUCKET_SIZE;
+    if (configuredBuckets != null && configuredBuckets.length > 0) {
+      String lastBucket = configuredBuckets[configuredBuckets.length - 1];
+      try {
+        maxCacheSize = Integer.parseInt(lastBucket.trim());
+      } catch (NumberFormatException ex) {
+        LOG.warn("Failed to parse {} value {} as int", 
BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY,
+          lastBucket, ex);
+      }
+    }
+    rowStatisticsDropped =
+      
regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED);
+    rowStatisticsPutFailed =
+      
regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED);
+    TableName tableName = regionEnv.getRegionInfo().getTable();
+    TABLE_COUNTERS.merge(tableName, 1L, Long::sum);
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    if (!(e instanceof RegionCoprocessorEnvironment)) {
+      return;
+    }
+    RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e;
+    if (regionEnv.getRegionInfo().getTable().isSystemTable()) {
+      return;
+    }
+    TableName tableName = regionEnv.getRegionInfo().getTable();
+    long tableCount = TABLE_COUNTERS.merge(tableName, -1L, Long::sum);
+    if (tableCount == 0) {
+      long regionCount = 0;
+      for (long count : TABLE_COUNTERS.values()) {
+        regionCount += count;
+      }
+      if (regionCount == 0) {
+        
regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_DROPPED,
+          rowStatisticsDropped);
+        
regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_PUT_FAILED,
+          rowStatisticsPutFailed);
+        RowStatisticsTableRecorder tableRecorder =
+          (RowStatisticsTableRecorder) 
regionEnv.getSharedData().get(TABLE_RECORDER_KEY);
+        if (tableRecorder != null) {
+          regionEnv.getSharedData().remove(TABLE_RECORDER_KEY, tableRecorder);
+          tableRecorder.close();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> 
ctx)
+    throws IOException {
+    try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
+      if (admin.tableExists(NAMESPACED_TABLE_NAME)) {
+        LOG.info("Table {} already exists. Skipping table creation process.",
+          NAMESPACED_TABLE_NAME);
+      } else {
+        boolean shouldCreateNamespace =
+          Arrays.stream(admin.listNamespaces()).filter(namespace -> 
namespace.equals(NAMESPACE))
+            .collect(Collectors.toUnmodifiableSet()).isEmpty();
+        if (shouldCreateNamespace) {
+          NamespaceDescriptor nd = 
NamespaceDescriptor.create(NAMESPACE).build();
+          try {
+            admin.createNamespace(nd);
+          } catch (IOException e) {
+            LOG.error("Failed to create namespace {}", NAMESPACE, e);
+          }
+        }
+        ColumnFamilyDescriptor cfd = 
ColumnFamilyDescriptorBuilder.newBuilder(CF).setMaxVersions(25)
+          .setTimeToLive((int) Duration.ofDays(7).toSeconds()).build();
+        TableDescriptor td =
+          
TableDescriptorBuilder.newBuilder(NAMESPACED_TABLE_NAME).setColumnFamily(cfd).build();
+        LOG.info("Creating table {}", NAMESPACED_TABLE_NAME);
+        try {
+          admin.createTable(td);
+        } catch (IOException e) {
+          LOG.error("Failed to create table {}", NAMESPACED_TABLE_NAME, e);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to get Connection or Admin. Cannot determine if table 
{} exists.",
+        NAMESPACED_TABLE_NAME, e);
+    }
+  }
+
+  @Override
+  public InternalScanner preCompact(ObserverContext<? extends 
RegionCoprocessorEnvironment> context,
+    Store store, InternalScanner scanner, ScanType scanType, 
CompactionLifeCycleTracker tracker,
+    CompactionRequest request) {
+    if (store.getTableName().isSystemTable()) {
+      return scanner;
+    }
+    int blocksize = store.getColumnFamilyDescriptor().getBlocksize();
+    boolean isMajor = request.isMajor();
+    RowStatisticsImpl stats = new 
RowStatisticsImpl(store.getTableName().getNameAsString(),
+      store.getRegionInfo().getEncodedName(), store.getColumnFamilyName(), 
blocksize, maxCacheSize,
+      isMajor);
+    return new RowStatisticsScanner(scanner, stats, context.getEnvironment(), 
recorder);
+  }
+
+  private static class RowStatisticsScanner implements InternalScanner, 
Shipper {
+
+    private final InternalScanner scanner;
+    private final Shipper shipper;
+    private final RowStatisticsImpl rowStatistics;
+    private final RegionCoprocessorEnvironment regionEnv;
+    private final Counter rowStatisticsDropped;
+    private final Counter rowStatisticsPutFailed;
+    private final RowStatisticsRecorder customRecorder;
+    private RawCellBuilder cellBuilder;
+    private Cell lastCell;
+
+    public RowStatisticsScanner(InternalScanner scanner, RowStatisticsImpl 
rowStatistics,
+      RegionCoprocessorEnvironment regionEnv, RowStatisticsRecorder 
customRecorder) {
+      this.scanner = scanner;
+      if (scanner instanceof Shipper) {
+        this.shipper = (Shipper) scanner;
+      } else {
+        this.shipper = null;
+      }
+      this.rowStatistics = rowStatistics;
+      this.regionEnv = regionEnv;
+      this.cellBuilder = regionEnv.getCellBuilder();
+      this.rowStatisticsDropped =
+        
regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED);
+      this.rowStatisticsPutFailed =
+        
regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED);
+      this.customRecorder = customRecorder;
+    }
+
+    @Override
+    public boolean next(List<? super ExtendedCell> result, ScannerContext 
scannerContext)
+      throws IOException {
+      boolean ret = scanner.next(result, scannerContext);
+      consumeCells(result);
+      return ret;
+    }
+
+    @Override
+    public boolean next(List<? super ExtendedCell> result) throws IOException {
+      boolean ret = scanner.next(result);
+      consumeCells(result);
+      return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+      rowStatistics.handleRowChanged(lastCell);
+      rowStatistics.shipped(cellBuilder);
+      record();
+      scanner.close();
+    }
+
+    @Override
+    public void shipped() throws IOException {
+      if (shipper != null) {
+        lastCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, lastCell);
+        rowStatistics.shipped(cellBuilder);
+        shipper.shipped();
+      }
+    }
+
+    private void consumeCells(List<? super ExtendedCell> result) {
+      if (result.isEmpty()) {
+        return;
+      }
+      // each next() call returns at most 1 row (maybe less for large rows)
+      // so we just need to check if the first cell has changed rows
+      ExtendedCell first = (ExtendedCell) result.get(0);
+      if (rowChanged(first)) {
+        rowStatistics.handleRowChanged(lastCell);
+      }
+      for (int i = 0; i < result.size(); i++) {
+        ExtendedCell cell = (ExtendedCell) result.get(i);
+        rowStatistics.consumeCell(cell);
+        lastCell = cell;
+      }
+    }
+
+    private boolean rowChanged(Cell cell) {
+      if (lastCell == null) {
+        return false;
+      }
+      return !CellUtil.matchingRows(lastCell, cell);
+    }
+
+    private void record() {
+      RowStatisticsTableRecorder tableRecorder =
+        (RowStatisticsTableRecorder) 
regionEnv.getSharedData().computeIfAbsent(TABLE_RECORDER_KEY,
+          k -> 
RowStatisticsTableRecorder.forClusterConnection(regionEnv.getConnection(),
+            rowStatisticsDropped, rowStatisticsPutFailed));
+      if (tableRecorder != null) {
+        tableRecorder.record(this.rowStatistics,
+          Optional.of(regionEnv.getRegion().getRegionInfo().getRegionName()));
+      } else {
+        LOG.error(
+          "Failed to initialize a TableRecorder. Will not record row 
statistics for region={}",
+          rowStatistics.getRegion());
+        rowStatisticsDropped.increment();
+      }
+      if (customRecorder != null) {
+        customRecorder.record(this.rowStatistics, Optional.empty());
+      }
+    }
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java
new file mode 100644
index 00000000000..365b7eb5ef6
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import java.util.Map;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil;
+import org.apache.hadoop.hbase.regionserver.Shipper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.GsonUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.gson.Gson;
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+
+/**
+ * Holder for accumulating row statistics in {@link 
RowStatisticsCompactionObserver} Creates various
+ * cell, row, and total stats.
+ */
[email protected]
+public class RowStatisticsImpl implements RowStatistics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowStatisticsImpl.class);
+  private static final Gson GSON = GsonUtil.createGson().create();
+
+  //
+  // Transient fields which are not included in gson serialization
+  //
+  private final transient long blockSize;
+  private final transient long maxCacheSize;
+  private transient int rowCells;
+  private transient long rowBytes;
+  private transient byte[] largestRow;
+  private transient Cell largestCell;
+  private final transient boolean isMajor;
+  private final transient SizeBucketTracker rowSizeBuckets;
+  private final transient SizeBucketTracker valueSizeBuckets;
+
+  // We don't need to clone anything until shipped() is called on scanner.
+  // To avoid allocations, we keep a reference until that point
+  private transient Cell largestRowRef;
+  private transient Cell largestCellRef;
+  //
+  // Non-transient fields which are included in gson
+  //
+  private final String table;
+  private final String region;
+  private final String columnFamily;
+  private long largestRowNumBytes;
+  private int largestRowCellsCount;
+  private long largestCellNumBytes;
+  private int cellsLargerThanOneBlockCount;
+  private int rowsLargerThanOneBlockCount;
+  private int cellsLargerThanMaxCacheSizeCount;
+  private int totalDeletesCount;
+  private int totalCellsCount;
+  private int totalRowsCount;
+  private long totalBytesCount;
+
+  RowStatisticsImpl(String table, String encodedRegion, String columnFamily, 
long blockSize,
+    long maxCacheSize, boolean isMajor) {
+    this.table = table;
+    this.region = encodedRegion;
+    this.columnFamily = columnFamily;
+    this.blockSize = blockSize;
+    this.maxCacheSize = maxCacheSize;
+    this.isMajor = isMajor;
+    this.rowSizeBuckets = new SizeBucketTracker();
+    this.valueSizeBuckets = new SizeBucketTracker();
+  }
+
+  public void handleRowChanged(Cell lastCell) {
+    if (rowBytes > largestRowNumBytes) {
+      largestRowRef = lastCell;
+      largestRowNumBytes = rowBytes;
+      largestRowCellsCount = rowCells;
+    }
+    if (rowBytes > blockSize) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RowTooLarge: rowBytes={}, blockSize={}, table={}, 
rowKey={}", rowBytes,
+          blockSize, table, Bytes.toStringBinary(lastCell.getRowArray(), 
lastCell.getRowOffset(),
+            lastCell.getRowLength()));
+      }
+      rowsLargerThanOneBlockCount++;
+    }
+    rowSizeBuckets.add(rowBytes);
+    rowBytes = 0;
+    rowCells = 0;
+    totalRowsCount++;
+  }
+
+  public void consumeCell(Cell cell) {
+    int cellSize = cell.getSerializedSize();
+
+    rowBytes += cellSize;
+    rowCells++;
+
+    boolean tooLarge = false;
+    if (cellSize > maxCacheSize) {
+      cellsLargerThanMaxCacheSizeCount++;
+      tooLarge = true;
+    }
+    if (cellSize > blockSize) {
+      cellsLargerThanOneBlockCount++;
+      tooLarge = true;
+    }
+
+    if (tooLarge && LOG.isDebugEnabled()) {
+      LOG.debug("CellTooLarge: size={}, blockSize={}, maxCacheSize={}, 
table={}, cell={}", cellSize,
+        blockSize, maxCacheSize, table, CellUtil.toString(cell, false));
+    }
+
+    if (cellSize > largestCellNumBytes) {
+      largestCellRef = cell;
+      largestCellNumBytes = cellSize;
+    }
+    valueSizeBuckets.add(cell.getValueLength());
+
+    totalCellsCount++;
+    if (CellUtil.isDelete(cell)) {
+      totalDeletesCount++;
+    }
+    totalBytesCount += cellSize;
+  }
+
+  /**
+   * Clone the cell refs so they can be cleaned up by {@link 
Shipper#shipped()}. Doing this lazily
+   * here, rather than eagerly in the above two methods can save us on some 
allocations. We might
+   * change the largestCell/largestRow multiple times between shipped() calls.
+   */
+  public void shipped(RawCellBuilder cellBuilder) {
+    if (largestRowRef != null) {
+      largestRow = CellUtil.cloneRow(largestRowRef);
+      largestRowRef = null;
+    }
+    if (largestCellRef != null) {
+      largestCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, 
largestCellRef);
+      largestCellRef = null;
+    }
+  }
+
+  @Override
+  public String getTable() {
+    return table;
+  }
+
+  @Override
+  public String getRegion() {
+    return region;
+  }
+
+  @Override
+  public String getColumnFamily() {
+    return columnFamily;
+  }
+
+  @Override
+  public boolean isMajor() {
+    return isMajor;
+  }
+
+  public byte[] getLargestRow() {
+    return largestRow;
+  }
+
+  @Override
+  public String getLargestRowAsString() {
+    return Bytes.toStringBinary(getLargestRow());
+  }
+
+  @Override
+  public long getLargestRowNumBytes() {
+    return largestRowNumBytes;
+  }
+
+  @Override
+  public int getLargestRowCellsCount() {
+    return largestRowCellsCount;
+  }
+
+  public Cell getLargestCell() {
+    return largestCell;
+  }
+
+  @Override
+  public String getLargestCellAsString() {
+    return CellUtil.toString(getLargestCell(), false);
+  }
+
+  @Override
+  public long getLargestCellNumBytes() {
+    return largestCellNumBytes;
+  }
+
+  @Override
+  public int getCellsLargerThanOneBlockCount() {
+    return cellsLargerThanOneBlockCount;
+  }
+
+  @Override
+  public int getRowsLargerThanOneBlockCount() {
+    return rowsLargerThanOneBlockCount;
+  }
+
+  @Override
+  public int getCellsLargerThanMaxCacheSizeCount() {
+    return cellsLargerThanMaxCacheSizeCount;
+  }
+
+  @Override
+  public int getTotalDeletesCount() {
+    return totalDeletesCount;
+  }
+
+  @Override
+  public int getTotalCellsCount() {
+    return totalCellsCount;
+  }
+
+  @Override
+  public int getTotalRowsCount() {
+    return totalRowsCount;
+  }
+
+  @Override
+  public long getTotalBytes() {
+    return totalBytesCount;
+  }
+
+  @Override
+  public Map<String, Long> getRowSizeBuckets() {
+    return rowSizeBuckets.toMap();
+  }
+
+  @Override
+  public Map<String, Long> getValueSizeBuckets() {
+    return valueSizeBuckets.toMap();
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+      .append("largestRowAsString", Bytes.toStringBinary(largestRow))
+      .append("largestCellAsString", largestCell).append("largestRowNumBytes", 
largestRowNumBytes)
+      .append("largestRowCellsCount", largestRowCellsCount)
+      .append("largestCellNumBytes", largestCellNumBytes)
+      .append("cellsLargerThanOneBlockCount", cellsLargerThanOneBlockCount)
+      .append("rowsLargerThanOneBlockCount", rowsLargerThanOneBlockCount)
+      .append("cellsLargerThanMaxCacheSizeCount", 
cellsLargerThanMaxCacheSizeCount)
+      .append("totalDeletesCount", 
totalDeletesCount).append("totalCellsCount", totalCellsCount)
+      .append("totalRowsCount", totalRowsCount).append("totalBytesCount", 
totalBytesCount)
+      .append("rowSizeBuckets", getRowSizeBuckets())
+      .append("valueSizeBuckets", getValueSizeBuckets()).append("isMajor", 
isMajor).toString();
+  }
+
+  @Override
+  public String getJsonString() {
+    JsonObject json = (JsonObject) GSON.toJsonTree(this);
+    json.add("largestCellParts", buildLargestCellPartsJson());
+    json.addProperty("largestRowAsString", getLargestRowAsString());
+    json.add("rowSizeBuckets", rowSizeBuckets.toJsonObject());
+    json.add("valueSizeBuckets", valueSizeBuckets.toJsonObject());
+    return json.toString();
+  }
+
+  private JsonObject buildLargestCellPartsJson() {
+    JsonObject cellJson = new JsonObject();
+    Cell cell = getLargestCell();
+    cellJson.addProperty("rowKey",
+      Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength()));
+    cellJson.addProperty("family",
+      Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength()));
+    cellJson.addProperty("qualifier", 
Bytes.toStringBinary(cell.getQualifierArray(),
+      cell.getQualifierOffset(), cell.getQualifierLength()));
+    cellJson.addProperty("timestamp", cell.getTimestamp());
+    cellJson.addProperty("type", cell.getType().toString());
+    return cellJson;
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java
new file mode 100644
index 00000000000..da49eb86351
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public enum SizeBucket {
+  KILOBYTES_1(0, 1 * 1024, "[0, 1)"),
+  KILOBYTES_2(1 * 1024, 2 * 1024, "[1, 2)"),
+  KILOBYTES_4(2 * 1024, 4 * 1024, "[2, 4)"),
+  KILOBYTES_8(4 * 1024, 8 * 1024, "[4, 8)"),
+  KILOBYTES_16(8 * 1024, 16 * 1024, "[8, 16)"),
+  KILOBYTES_32(16 * 1024, 32 * 1024, "[16, 32)"),
+  KILOBYTES_64(32 * 1024, 64 * 1024, "[32, 64)"),
+  KILOBYTES_128(64 * 1024, 128 * 1024, "[64, 128)"),
+  KILOBYTES_256(128 * 1024, 256 * 1024, "[128, 256)"),
+  KILOBYTES_512(256 * 1024, 512 * 1024, "[256, 512)"),
+  KILOBYTES_MAX(512 * 1024, Long.MAX_VALUE, "[512, inf)");
+
+  private final long minBytes;
+  private final long maxBytes;
+  private final String bucket;
+
+  SizeBucket(long minBytes, long maxBytes, String bucket) {
+    this.minBytes = minBytes;
+    this.maxBytes = maxBytes;
+    this.bucket = bucket;
+  }
+
+  public long minBytes() {
+    return minBytes;
+  }
+
+  public long maxBytes() {
+    return maxBytes;
+  }
+
+  public String bucket() {
+    return bucket;
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java
new file mode 100644
index 00000000000..702eb32d8d8
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+
[email protected]
+public class SizeBucketTracker {
+
+  private static final SizeBucket[] SIZE_BUCKET_ARRAY = SizeBucket.values();
+  private final Map<SizeBucket, Long> bucketToCount;
+
+  public SizeBucketTracker() {
+    SizeBucket[] sizeBucketsArray = SizeBucket.values();
+
+    bucketToCount = new HashMap<>(sizeBucketsArray.length);
+    for (SizeBucket sizeBucket : sizeBucketsArray) {
+      bucketToCount.put(sizeBucket, 0L);
+    }
+  }
+
+  public void add(long rowBytes) {
+    if (rowBytes < 0) {
+      return;
+    }
+    SizeBucket sizeBucket = search(rowBytes);
+    if (sizeBucket == null) {
+      return;
+    }
+    long val = bucketToCount.get(sizeBucket);
+    bucketToCount.put(sizeBucket, getSafeIncrementedValue(val));
+  }
+
+  public Map<String, Long> toMap() {
+    Map<String, Long> copy = new HashMap<>(SIZE_BUCKET_ARRAY.length);
+    for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) {
+      long val = bucketToCount.get(sizeBucket);
+      copy.put(sizeBucket.bucket(), val);
+    }
+    return copy;
+  }
+
+  public JsonObject toJsonObject() {
+    JsonObject bucketJson = new JsonObject();
+    for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) {
+      long val = bucketToCount.get(sizeBucket);
+      bucketJson.addProperty(sizeBucket.bucket(), val);
+    }
+    return bucketJson;
+  }
+
+  private SizeBucket search(long val) {
+    for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) {
+      if (val < sizeBucket.maxBytes()) {
+        return sizeBucket;
+      }
+    }
+    return val == Long.MAX_VALUE ? SizeBucket.KILOBYTES_MAX : null;
+  }
+
+  private static long getSafeIncrementedValue(long val) {
+    return val == Long.MAX_VALUE ? val : val + 1;
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsCombinedRecorder.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsCombinedRecorder.java
new file mode 100644
index 00000000000..6ceae50d9c0
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsCombinedRecorder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class RowStatisticsCombinedRecorder implements RowStatisticsRecorder {
+
+  private final RowStatisticsRecorder one;
+  private final RowStatisticsRecorder two;
+
+  public RowStatisticsCombinedRecorder(RowStatisticsRecorder one, 
RowStatisticsRecorder two) {
+    this.one = one;
+    this.two = two;
+  }
+
+  @Override
+  public void record(RowStatisticsImpl stats, Optional<byte[]> fullRegionName) 
{
+    one.record(stats, fullRegionName);
+    two.record(stats, fullRegionName);
+  }
+
+  public RowStatisticsRecorder getOne() {
+    return one;
+  }
+
+  public RowStatisticsRecorder getTwo() {
+    return two;
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java
new file mode 100644
index 00000000000..78f5eb37647
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public interface RowStatisticsRecorder {
+  void record(RowStatisticsImpl stats, Optional<byte[]> fullRegionName);
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsTableRecorder.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsTableRecorder.java
new file mode 100644
index 00000000000..23fe4c344e8
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsTableRecorder.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder;
+
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsConfigurationUtil.getInt;
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsConfigurationUtil.getLong;
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACED_TABLE_NAME;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionConfiguration;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsDisruptorExceptionHandler;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsEventHandler;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferEnvelope;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferPayload;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
[email protected]
+public final class RowStatisticsTableRecorder implements RowStatisticsRecorder 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowStatisticsTableRecorder.class);
+  // Must be multiple of 2. Should be greater than num regions/RS
+  private static final int DEFAULT_EVENT_COUNT = 1024;
+  private static final long DISRUPTOR_SHUTDOWN_TIMEOUT_MS = 60_0000L;
+  private final BufferedMutator bufferedMutator;
+  private final Counter rowStatisticsDropped;
+  private final Disruptor<RowStatisticsRingBufferEnvelope> disruptor;
+  private final RingBuffer<RowStatisticsRingBufferEnvelope> ringBuffer;
+  private final AtomicBoolean closed;
+
+  /*
+   * This constructor is ONLY for testing. Use 
TableRecorder#forClusterConnection if you want to
+   * instantiate a TableRecorder object.
+   */
+  private RowStatisticsTableRecorder(BufferedMutator bufferedMutator,
+    Disruptor<RowStatisticsRingBufferEnvelope> disruptor, Counter 
rowStatisticsDropped) {
+    this.bufferedMutator = bufferedMutator;
+    this.disruptor = disruptor;
+    this.ringBuffer = disruptor.getRingBuffer();
+    this.rowStatisticsDropped = rowStatisticsDropped;
+    this.closed = new AtomicBoolean(false);
+  }
+
+  public static RowStatisticsTableRecorder forClusterConnection(Connection 
clusterConnection,
+    Counter rowStatisticsDropped, Counter rowStatisticsPutFailed) {
+    BufferedMutator bufferedMutator =
+      initializeBufferedMutator(clusterConnection, rowStatisticsPutFailed);
+    if (bufferedMutator == null) {
+      return null;
+    }
+
+    Disruptor<RowStatisticsRingBufferEnvelope> disruptor =
+      initializeDisruptor(bufferedMutator, rowStatisticsPutFailed);
+    disruptor.start();
+
+    return new RowStatisticsTableRecorder(bufferedMutator, disruptor, 
rowStatisticsDropped);
+  }
+
+  @Override
+  public void record(RowStatisticsImpl rowStatistics, Optional<byte[]> 
fullRegionName) {
+    if (!closed.get()) {
+      if (
+        !ringBuffer.tryPublishEvent((envelope, seqId) -> envelope
+          .load(new RowStatisticsRingBufferPayload(rowStatistics, 
fullRegionName.get())))
+      ) {
+        rowStatisticsDropped.increment();
+        LOG.error("Failed to load row statistics for region={} into the ring 
buffer",
+          rowStatistics.getRegion());
+      }
+    } else {
+      rowStatisticsDropped.increment();
+      LOG.error("TableRecorder is closed. Will not record row statistics for 
region={}",
+        rowStatistics.getRegion());
+    }
+  }
+
+  public void close() throws IOException {
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+    try {
+      disruptor.shutdown(DISRUPTOR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      LOG.warn(
+        "Disruptor shutdown timed out after {} ms. Forcing halt. Some row 
statistics may be lost",
+        DISRUPTOR_SHUTDOWN_TIMEOUT_MS);
+      disruptor.halt();
+      disruptor.shutdown();
+    }
+    bufferedMutator.close();
+  }
+
+  private static BufferedMutator initializeBufferedMutator(Connection conn,
+    Counter rowStatisticsPutFailed) {
+    Configuration conf = conn.getConfiguration();
+    TableRecorderExceptionListener exceptionListener =
+      new TableRecorderExceptionListener(rowStatisticsPutFailed);
+    BufferedMutatorParams params = new 
BufferedMutatorParams(NAMESPACED_TABLE_NAME)
+      .rpcTimeout(getInt(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 15_000))
+      .operationTimeout(getInt(conf, 
HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30_000))
+      .setWriteBufferPeriodicFlushTimeoutMs(
+        getLong(conf, 
ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, 60_000L))
+      .writeBufferSize(getLong(conf, 
ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY,
+        ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT))
+      .listener(exceptionListener);
+    BufferedMutator bufferedMutator = null;
+    try {
+      bufferedMutator = conn.getBufferedMutator(params);
+    } catch (IOException e) {
+      LOG.error("This should NEVER print!", e);
+    }
+    return bufferedMutator;
+  }
+
+  private static Disruptor<RowStatisticsRingBufferEnvelope>
+    initializeDisruptor(BufferedMutator bufferedMutator, Counter 
rowStatisticsPutFailures) {
+    Disruptor<RowStatisticsRingBufferEnvelope> disruptor =
+      new Disruptor<>(RowStatisticsRingBufferEnvelope::new, 
DEFAULT_EVENT_COUNT,
+        new 
ThreadFactoryBuilder().setNameFormat("rowstats.append-pool-%d").setDaemon(true)
+          
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
+        ProducerType.MULTI, new BlockingWaitStrategy());
+    disruptor.setDefaultExceptionHandler(new 
RowStatisticsDisruptorExceptionHandler());
+    RowStatisticsEventHandler rowStatisticsEventHandler =
+      new RowStatisticsEventHandler(bufferedMutator, rowStatisticsPutFailures);
+    disruptor.handleEventsWith(new RowStatisticsEventHandler[] { 
rowStatisticsEventHandler });
+    return disruptor;
+  }
+
+  protected static class TableRecorderExceptionListener
+    implements BufferedMutator.ExceptionListener {
+
+    private final Counter rowStatisticsPutFailures;
+
+    TableRecorderExceptionListener(Counter counter) {
+      this.rowStatisticsPutFailures = counter;
+    }
+
+    public void onException(RetriesExhaustedWithDetailsException exception,
+      BufferedMutator mutator) {
+      long failedPuts = mutator.getWriteBufferSize();
+      rowStatisticsPutFailures.increment(failedPuts);
+      LOG.error(
+        "Periodic flush of buffered mutator failed. Cannot persist {} row 
stats stored in buffer",
+        failedPuts, exception);
+    }
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsDisruptorExceptionHandler.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsDisruptorExceptionHandler.java
new file mode 100644
index 00000000000..07f26977d67
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsDisruptorExceptionHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import com.lmax.disruptor.ExceptionHandler;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class RowStatisticsDisruptorExceptionHandler
+  implements ExceptionHandler<RowStatisticsRingBufferEnvelope> {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(RowStatisticsDisruptorExceptionHandler.class);
+
+  @Override
+  public void handleEventException(Throwable e, long sequence,
+    RowStatisticsRingBufferEnvelope event) {
+    if (event != null) {
+      LOG.error("Unable to persist event={} with sequence={}", 
event.getPayload(), sequence, e);
+    } else {
+      LOG.error("Event with sequence={} was null", sequence, e);
+    }
+  }
+
+  @Override
+  public void handleOnStartException(Throwable e) {
+    LOG.error("Disruptor onStartException", e);
+  }
+
+  @Override
+  public void handleOnShutdownException(Throwable e) {
+    LOG.error("Disruptor onShutdownException", e);
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java
new file mode 100644
index 00000000000..8b9ca1ff4cb
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import static 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.buildPutForRegion;
+
+import com.lmax.disruptor.EventHandler;
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class RowStatisticsEventHandler implements 
EventHandler<RowStatisticsRingBufferEnvelope> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowStatisticsEventHandler.class);
+  private final BufferedMutator bufferedMutator;
+  private final Counter rowStatisticsPutFailures;
+
+  public RowStatisticsEventHandler(BufferedMutator bufferedMutator,
+    Counter rowStatisticsPutFailures) {
+    this.bufferedMutator = bufferedMutator;
+    this.rowStatisticsPutFailures = rowStatisticsPutFailures;
+  }
+
+  @Override
+  public void onEvent(RowStatisticsRingBufferEnvelope event, long sequence, 
boolean endOfBatch)
+    throws Exception {
+    final RowStatisticsRingBufferPayload payload = event.getPayload();
+    if (payload != null) {
+      final RowStatistics rowStatistics = payload.getRowStatistics();
+      final byte[] fullRegionName = payload.getFullRegionName();
+      Put put = buildPutForRegion(fullRegionName, rowStatistics, 
rowStatistics.isMajor());
+      try {
+        bufferedMutator.mutate(put);
+      } catch (IOException e) {
+        rowStatisticsPutFailures.increment();
+        LOG.error("Mutate operation failed. Cannot persist row statistics for 
region {}",
+          rowStatistics.getRegion(), e);
+      }
+    }
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferEnvelope.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferEnvelope.java
new file mode 100644
index 00000000000..75601ae9eaf
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferEnvelope.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class RowStatisticsRingBufferEnvelope {
+
+  private RowStatisticsRingBufferPayload payload;
+
+  public RowStatisticsRingBufferEnvelope() {
+  }
+
+  public void load(RowStatisticsRingBufferPayload payload) {
+    this.payload = payload;
+  }
+
+  public RowStatisticsRingBufferPayload getPayload() {
+    final RowStatisticsRingBufferPayload payload = this.payload;
+    this.payload = null;
+    return payload;
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferPayload.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferPayload.java
new file mode 100644
index 00000000000..498f7a660d2
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferPayload.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class RowStatisticsRingBufferPayload {
+
+  private final RowStatistics rowStatistics;
+  private final byte[] fullRegionName;
+
+  public RowStatisticsRingBufferPayload(RowStatistics rowStatistics, byte[] 
fullRegionName) {
+    this.rowStatistics = rowStatistics;
+    this.fullRegionName = fullRegionName;
+  }
+
+  public RowStatistics getRowStatistics() {
+    return rowStatistics;
+  }
+
+  public byte[] getFullRegionName() {
+    return fullRegionName;
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsConfigurationUtil.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsConfigurationUtil.java
new file mode 100644
index 00000000000..f7f3a373c3f
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsConfigurationUtil.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class RowStatisticsConfigurationUtil {
+
+  private RowStatisticsConfigurationUtil() {
+  }
+
+  private static final String ROW_STATISTICS_PREFIX =
+    "org.apache.hbase.coprocessor.row.statistics.";
+
+  public static int getInt(Configuration conf, String name, int defaultValue) {
+    return conf.getInt(ROW_STATISTICS_PREFIX + name, defaultValue);
+  }
+
+  public static long getLong(Configuration conf, String name, long 
defaultValue) {
+    return conf.getLong(ROW_STATISTICS_PREFIX + name, defaultValue);
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsTableUtil.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsTableUtil.java
new file mode 100644
index 00000000000..cd06ce7dbe4
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsTableUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public final class RowStatisticsTableUtil {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RowStatisticsTableUtil.class);
+  public static final String NAMESPACE = "stats";
+  public static final String TABLE_STRING = "row-statistics";
+  public static final TableName NAMESPACED_TABLE_NAME = 
TableName.valueOf(NAMESPACE, TABLE_STRING);
+  public static final byte[] CF = Bytes.toBytes("0");
+  public static final String TABLE_RECORDER_KEY = "tableRecorder";
+
+  private RowStatisticsTableUtil() {
+  }
+
+  public static Put buildPutForRegion(byte[] regionRowKey, RowStatistics 
rowStatistics,
+    boolean isMajor) {
+    Put put = new Put(regionRowKey);
+    String cq = rowStatistics.getColumnFamily() + (isMajor ? "1" : "0");
+    String jsonString = rowStatistics.getJsonString();
+    put.addColumn(CF, Bytes.toBytes(cq), Bytes.toBytes(jsonString));
+    LOG.debug(jsonString);
+    return put;
+  }
+}
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java
new file mode 100644
index 00000000000..4189da8db24
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class RowStatisticsUtil {
+
+  private RowStatisticsUtil() {
+  }
+
+  public static Cell cloneWithoutValue(RawCellBuilder cellBuilder, Cell cell) {
+    return cellBuilder.clear().setRow(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength())
+      .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength())
+      .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength())
+      .setTimestamp(cell.getTimestamp()).setType(cell.getType()).build();
+  }
+}
diff --git 
a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsCompactionObserver.java
 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsCompactionObserver.java
new file mode 100644
index 00000000000..d0ee8d3bc57
--- /dev/null
+++ 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsCompactionObserver.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static org.apache.hadoop.hbase.util.TestRegionSplitCalculator.TEST_UTIL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRowStatisticsCompactionObserver {
+
+  public static final TestableRowStatisticsRecorder RECORDER = new 
TestableRowStatisticsRecorder();
+  private static final TableName TABLE_NAME = TableName.valueOf("test-table");
+  private static final byte[] FAMILY = Bytes.toBytes("0");
+  private static SingleProcessHBaseCluster cluster;
+  private static Connection connection;
+  private static Table table;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    cluster = TEST_UTIL.startMiniCluster(1);
+    connection = ConnectionFactory.createConnection(cluster.getConf());
+    table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
+      HConstants.DEFAULT_BLOCKSIZE, 
TestableRowStatisticsCompactionObserver.class.getName());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    RECORDER.clear();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    cluster.close();
+    TEST_UTIL.shutdownMiniCluster();
+    table.close();
+    connection.close();
+  }
+
+  @Test
+  public void itRecordsStats() throws IOException, InterruptedException {
+    int numRows = 10;
+    int largestRowNum = -1;
+    int largestRowSize = 0;
+
+    int largestCellRowNum = -1;
+    int largestCellColNum = -1;
+    long largestCellSize = 0;
+
+    for (int i = 0; i < numRows; i++) {
+      int cells = ThreadLocalRandom.current().nextInt(1000) + 10;
+
+      Put p = new Put(Bytes.toBytes(i));
+      for (int j = 0; j < cells; j++) {
+        byte[] val = new byte[ThreadLocalRandom.current().nextInt(100) + 1];
+        p.addColumn(FAMILY, Bytes.toBytes(j), val);
+      }
+
+      int rowSize = 0;
+      CellScanner cellScanner = p.cellScanner();
+      int j = 0;
+      while (cellScanner.advance()) {
+        Cell current = cellScanner.current();
+        int serializedSize = current.getSerializedSize();
+        if (serializedSize > largestCellSize) {
+          largestCellSize = serializedSize;
+          largestCellRowNum = i;
+          largestCellColNum = j;
+        }
+        rowSize += serializedSize;
+        j++;
+      }
+
+      if (rowSize > largestRowSize) {
+        largestRowNum = i;
+        largestRowSize = rowSize;
+      }
+
+      table.put(p);
+      connection.getAdmin().flush(table.getName());
+    }
+
+    for (int i = 0; i < numRows; i++) {
+      Delete d = new Delete(Bytes.toBytes(i));
+      d.addColumn(FAMILY, Bytes.toBytes(0));
+      table.delete(d);
+    }
+
+    System.out.println("Final flush");
+    connection.getAdmin().flush(table.getName());
+    Thread.sleep(5000);
+    System.out.println("Compacting");
+
+    RowStatisticsImpl lastStats = RECORDER.getLastStats(); // Just initialize
+    Boolean lastIsMajor = RECORDER.getLastIsMajor();
+    connection.getAdmin().compact(table.getName());
+    while (lastStats == null) {
+      Thread.sleep(1000);
+
+      System.out.println("Checking stats");
+      lastStats = RECORDER.getLastStats();
+      lastIsMajor = RECORDER.getLastIsMajor();
+    }
+    assertFalse(lastIsMajor);
+    assertEquals(lastStats.getTotalDeletesCount(), 10);
+    assertEquals(lastStats.getTotalRowsCount(), 10);
+
+    RECORDER.clear();
+    lastStats = RECORDER.getLastStats();
+    lastIsMajor = RECORDER.getLastIsMajor();
+    connection.getAdmin().majorCompact(table.getName());
+
+    // Must wait for async majorCompaction to complete
+    while (lastStats == null) {
+      Thread.sleep(1000);
+
+      System.out.println("Checking stats");
+      lastStats = RECORDER.getLastStats();
+      lastIsMajor = RECORDER.getLastIsMajor();
+    }
+    assertTrue(lastIsMajor);
+    // no deletes after major compact
+    assertEquals(lastStats.getTotalDeletesCount(), 0);
+    assertEquals(lastStats.getTotalRowsCount(), 10);
+    // can only check largest values after major compact, since the above 
minor compact might not
+    // contain all storefiles
+    assertEquals(Bytes.toInt(lastStats.getLargestRow()), largestRowNum);
+    assertEquals(
+      Bytes.toInt(lastStats.getLargestCell().getRowArray(),
+        lastStats.getLargestCell().getRowOffset(), 
lastStats.getLargestCell().getRowLength()),
+      largestCellRowNum);
+    assertEquals(Bytes.toInt(lastStats.getLargestCell().getQualifierArray(),
+      lastStats.getLargestCell().getQualifierOffset(),
+      lastStats.getLargestCell().getQualifierLength()), largestCellColNum);
+  }
+
+  public static class TestableRowStatisticsCompactionObserver
+    extends RowStatisticsCompactionObserver {
+
+    public TestableRowStatisticsCompactionObserver() {
+      super(TestRowStatisticsCompactionObserver.RECORDER);
+    }
+  }
+
+  public static class TestableRowStatisticsRecorder implements 
RowStatisticsRecorder {
+
+    private volatile RowStatisticsImpl lastStats = null;
+    private volatile Boolean lastIsMajor = null;
+
+    @Override
+    public void record(RowStatisticsImpl stats, Optional<byte[]> 
fullRegionName) {
+      System.out.println("Record called with isMajor=" + stats.isMajor() + ", 
stats=" + stats
+        + ", fullRegionName=" + fullRegionName);
+      lastStats = stats;
+    }
+
+    public void clear() {
+      lastStats = null;
+      lastIsMajor = null;
+    }
+
+    public RowStatisticsImpl getLastStats() {
+      return lastStats;
+    }
+
+    public Boolean getLastIsMajor() {
+      return lastIsMajor;
+    }
+  }
+}
diff --git 
a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsEventHandler.java
 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsEventHandler.java
new file mode 100644
index 00000000000..e009ceaed5e
--- /dev/null
+++ 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsEventHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Put;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsEventHandler;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferEnvelope;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferPayload;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.metrics.impl.CounterImpl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRowStatisticsEventHandler {
+
+  private static final String REGION_STRING = "REGION_STRING";
+  private static final byte[] FULL_REGION = 
Bytes.toBytes("FULL_REGION_STRING");
+  private static final String JSON_STRING = "JSON_STRING";
+  private static final RowStatisticsRingBufferEnvelope EVENT =
+    new RowStatisticsRingBufferEnvelope();
+  private static final RowStatistics ROW_STATISTICS = 
mock(RowStatistics.class);
+  private BufferedMutator bufferedMutator;
+  private Counter failureCounter;
+  private RowStatisticsEventHandler eventHandler;
+
+  @Before
+  public void setup() {
+    bufferedMutator = mock(BufferedMutator.class);
+    failureCounter = new CounterImpl();
+    eventHandler = new RowStatisticsEventHandler(bufferedMutator, 
failureCounter);
+    when(ROW_STATISTICS.getRegion()).thenReturn(REGION_STRING);
+    when(ROW_STATISTICS.getJsonString()).thenReturn(JSON_STRING);
+  }
+
+  @Test
+  public void itPersistsRowStatistics() throws Exception {
+    EVENT.load(new RowStatisticsRingBufferPayload(ROW_STATISTICS, 
FULL_REGION));
+    doNothing().when(bufferedMutator).mutate(any(Put.class));
+    eventHandler.onEvent(EVENT, 0L, true);
+    verify(bufferedMutator, times(1)).mutate(any(Put.class));
+    assertEquals(failureCounter.getCount(), 0);
+  }
+
+  @Test
+  public void itDoesNotPublishNullRowStatistics() throws Exception {
+    EVENT.load(null);
+    eventHandler.onEvent(EVENT, 0L, true);
+    verify(bufferedMutator, times(0)).mutate(any(Put.class));
+    assertEquals(failureCounter.getCount(), 0);
+  }
+
+  @Test
+  public void itCountsFailedPersists() throws Exception {
+    EVENT.load(new RowStatisticsRingBufferPayload(ROW_STATISTICS, 
FULL_REGION));
+    doThrow(new IOException()).when(bufferedMutator).mutate(any(Put.class));
+    eventHandler.onEvent(EVENT, 0L, true);
+    verify(bufferedMutator, times(1)).mutate(any(Put.class));
+    assertEquals(failureCounter.getCount(), 1);
+  }
+}
diff --git 
a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsTableRecorder.java
 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsTableRecorder.java
new file mode 100644
index 00000000000..39a7e36c202
--- /dev/null
+++ 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsTableRecorder.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static org.apache.hadoop.hbase.util.TestRegionSplitCalculator.TEST_UTIL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsTableRecorder;
+import 
org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.metrics.impl.CounterImpl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestRowStatisticsTableRecorder {
+
+  private static final NamespaceDescriptor NAMESPACE_DESCRIPTOR =
+    NamespaceDescriptor.create(RowStatisticsTableUtil.NAMESPACE).build();
+  private static final byte[] FULL_REGION_NAME = 
Bytes.toBytes("fullRegionName");
+  private static SingleProcessHBaseCluster cluster;
+  private static Connection connection;
+  private RowStatisticsImpl rowStatistics;
+  private Counter counter;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    cluster = TEST_UTIL.startMiniCluster(1);
+    connection = ConnectionFactory.createConnection(cluster.getConf());
+    connection.getAdmin().createNamespace(NAMESPACE_DESCRIPTOR);
+    // need this table to write to
+    TEST_UTIL.createTable(RowStatisticsTableUtil.NAMESPACED_TABLE_NAME, 
RowStatisticsTableUtil.CF);
+  }
+
+  @Before
+  public void setup() {
+    rowStatistics = mock(RowStatisticsImpl.class);
+    counter = new CounterImpl();
+  }
+
+  @Test
+  public void itReturnsNullRecorderOnFailedBufferedMutator() throws 
IOException {
+    Connection badConnection = mock(Connection.class);
+    Configuration conf = mock(Configuration.class);
+    when(badConnection.getConfiguration()).thenReturn(conf);
+    when(badConnection.getBufferedMutator(any(BufferedMutatorParams.class)))
+      .thenThrow(IOException.class);
+    RowStatisticsTableRecorder recorder =
+      RowStatisticsTableRecorder.forClusterConnection(badConnection, counter, 
counter);
+    assertNull(recorder);
+  }
+
+  @Test
+  public void itDoesNotIncrementCounterWhenRecordSucceeds() throws IOException 
{
+    RowStatisticsTableRecorder recorder =
+      RowStatisticsTableRecorder.forClusterConnection(connection, counter, 
counter);
+    assertNotNull(recorder);
+    recorder.record(rowStatistics, Optional.of(FULL_REGION_NAME));
+    assertEquals(counter.getCount(), 0);
+  }
+
+  @Test
+  public void itIncrementsCounterWhenRecordFails() throws IOException {
+    RowStatisticsTableRecorder recorder =
+      RowStatisticsTableRecorder.forClusterConnection(connection, counter, 
counter);
+    assertNotNull(recorder);
+    recorder.close();
+    recorder.record(rowStatistics, Optional.of(FULL_REGION_NAME));
+    assertEquals(counter.getCount(), 1);
+  }
+}
diff --git 
a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/sizebucket/TestSizeBucketTracker.java
 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/sizebucket/TestSizeBucketTracker.java
new file mode 100644
index 00000000000..de5bc583416
--- /dev/null
+++ 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/sizebucket/TestSizeBucketTracker.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.sizebucket;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.SizeBucket;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.SizeBucketTracker;
+import org.junit.Test;
+
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+
+public class TestSizeBucketTracker {
+
+  @Test
+  public void itUpdatesSizeBuckets() {
+    SizeBucketTracker sizeBucketTracker = new SizeBucketTracker();
+    SizeBucket[] sizeBuckets = SizeBucket.values();
+
+    // Initialize
+    Map<String, Long> bucketToCount = sizeBucketTracker.toMap();
+    for (SizeBucket sizeBucket : SizeBucket.values()) {
+      assertEquals((long) bucketToCount.get(sizeBucket.bucket()), 0L);
+    }
+
+    // minBytes
+    for (SizeBucket sizeBucket : sizeBuckets) {
+      sizeBucketTracker.add(sizeBucket.minBytes());
+    }
+    bucketToCount = sizeBucketTracker.toMap();
+    for (SizeBucket sizeBucket : sizeBuckets) {
+      assertEquals((long) bucketToCount.get(sizeBucket.bucket()), 1L);
+    }
+
+    // maxBytes - 1
+    for (SizeBucket sizeBucket : sizeBuckets) {
+      sizeBucketTracker.add(sizeBucket.maxBytes() - 1);
+    }
+    bucketToCount = sizeBucketTracker.toMap();
+    for (SizeBucket sizeBucket : sizeBuckets) {
+      assertEquals((long) bucketToCount.get(sizeBucket.bucket()), 2L);
+    }
+
+    // maxBytes
+    for (SizeBucket sizeBucket : sizeBuckets) {
+      sizeBucketTracker.add(sizeBucket.maxBytes());
+    }
+    bucketToCount = sizeBucketTracker.toMap();
+    for (int i = 0; i < sizeBuckets.length - 1; i++) {
+      SizeBucket currBucket = sizeBuckets[i];
+      if (currBucket == SizeBucket.KILOBYTES_1) {
+        assertEquals((long) bucketToCount.get(currBucket.bucket()), 2L);
+      } else {
+        SizeBucket nextBucket = sizeBuckets[i + 1];
+        if (nextBucket == SizeBucket.KILOBYTES_MAX) {
+          assertEquals((long) bucketToCount.get(nextBucket.bucket()), 4L);
+        } else {
+          assertEquals((long) bucketToCount.get(nextBucket.bucket()), 3L);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void itCreatesJson() {
+    SizeBucketTracker sizeBucketTracker = new SizeBucketTracker();
+    SizeBucket[] sizeBuckets = SizeBucket.values();
+    for (SizeBucket sizeBucket : sizeBuckets) {
+      sizeBucketTracker.add(sizeBucket.minBytes());
+    }
+    JsonObject mapJson = sizeBucketTracker.toJsonObject();
+    for (SizeBucket sizeBucket : sizeBuckets) {
+      Number count = mapJson.get(sizeBucket.bucket()).getAsNumber();
+      assertEquals(count.longValue(), 1L);
+    }
+  }
+}
diff --git 
a/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java
 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java
index afd4a544cfa..70d0a09367b 100644
--- 
a/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java
+++ 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java
@@ -163,7 +163,6 @@ public class TestMapReduceExamples {
   @Test
   public void testMainIndexBuilder() throws Exception {
     PrintStream oldPrintStream = System.err;
-    SecurityManager SECURITY_MANAGER = System.getSecurityManager();
     LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
     System.setSecurityManager(newSecurityManager);
     ByteArrayOutputStream data = new ByteArrayOutputStream();
@@ -182,7 +181,6 @@ public class TestMapReduceExamples {
       }
     } finally {
       System.setErr(oldPrintStream);
-      System.setSecurityManager(SECURITY_MANAGER);
     }
   }
 }

Reply via email to