HBASE-14693 Add client-side metrics for received pushback signals

Signed-off-by: Andrew Purtell <apurt...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/086bacd1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/086bacd1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/086bacd1

Branch: refs/heads/hbase-12439
Commit: 086bacd12d1c32a6c1c6736091334185be144a04
Parents: 44367f5
Author: chenheng <chenh...@fenbi.com>
Authored: Thu Nov 5 16:12:41 2015 +0800
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Mon Nov 9 18:03:30 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  17 +++
 .../hadoop/hbase/client/MetricsConnection.java  | 106 +++++++++++++++++++
 .../hadoop/hbase/client/TestClientPushback.java |  20 +++-
 3 files changed, 141 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/086bacd1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 7c7fc3e..f1fa3eb 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -1009,6 +1009,9 @@ class AsyncProcess {
         int numAttempt) {
       // no stats to manage, just do the standard action
       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
+        if (connection.getConnectionMetrics() != null) {
+          connection.getConnectionMetrics().incrNormalRunners();
+        }
         return 
Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
             new SingleServerRequestRunnable(multiAction, numAttempt, server, 
callsInProgress)));
       }
@@ -1039,6 +1042,14 @@ class AsyncProcess {
           runner.setRunner(runnable);
           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
           runnable = runner;
+          if (connection.getConnectionMetrics() != null) {
+            connection.getConnectionMetrics().incrDelayRunners();
+            
connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
+          }
+        } else {
+          if (connection.getConnectionMetrics() != null) {
+            connection.getConnectionMetrics().incrNormalRunners();
+          }
         }
         runnable = Trace.wrap(traceText, runnable);
         toReturn.add(runnable);
@@ -1267,6 +1278,12 @@ class AsyncProcess {
               ++failed;
             }
           } else {
+            
+            if (AsyncProcess.this.connection.getConnectionMetrics() != null) {
+              AsyncProcess.this.connection.getConnectionMetrics().
+                      updateServerStats(server, regionName, result);
+            }
+
             // update the stats about the region, if its a user table. We 
don't want to slow down
             // updates to meta tables, especially from internal updates 
(master, etc).
             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/086bacd1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index f34fb8a..3863c37 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -26,12 +26,16 @@ import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.reporting.JmxReporter;
 import com.yammer.metrics.util.RatioGauge;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +58,8 @@ public class MetricsConnection {
   private static final String DRTN_BASE = "rpcCallDurationMs_";
   private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
   private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
+  private static final String MEMLOAD_BASE = "memstoreLoad_";
+  private static final String HEAP_BASE = "heapOccupancy_";
   private static final String CLIENT_SVC = 
ClientService.getDescriptor().getName();
 
   /** A container class for collecting details about the RPC call as it 
percolates. */
@@ -130,6 +136,88 @@ public class MetricsConnection {
     }
   }
 
+  protected static class RegionStats {
+    final String name;
+    final Histogram memstoreLoadHist;
+    final Histogram heapOccupancyHist;
+
+    public RegionStats(MetricsRegistry registry, String name) {
+      this.name = name;
+      this.memstoreLoadHist = registry.newHistogram(MetricsConnection.class,
+          MEMLOAD_BASE + this.name);
+      this.heapOccupancyHist = registry.newHistogram(MetricsConnection.class,
+          HEAP_BASE + this.name);
+    }
+
+    public void update(ClientProtos.RegionLoadStats regionStatistics) {
+      this.memstoreLoadHist.update(regionStatistics.getMemstoreLoad());
+      this.heapOccupancyHist.update(regionStatistics.getHeapOccupancy());
+    }
+  }
+
+  @VisibleForTesting
+  protected static class RunnerStats {
+    final Counter normalRunners;
+    final Counter delayRunners;
+    final Histogram delayIntevalHist;
+
+    public RunnerStats(MetricsRegistry registry) {
+      this.normalRunners = registry.newCounter(MetricsConnection.class, 
"normalRunnersCount");
+      this.delayRunners = registry.newCounter(MetricsConnection.class, 
"delayRunnersCount");
+      this.delayIntevalHist = registry.newHistogram(MetricsConnection.class, 
"delayIntervalHist");
+    }
+
+    public void incrNormalRunners() {
+      this.normalRunners.inc();
+    }
+
+    public void incrDelayRunners() {
+      this.delayRunners.inc();
+    }
+
+    public void updateDelayInterval(long interval) {
+      this.delayIntevalHist.update(interval);
+    }
+  }
+
+  @VisibleForTesting
+  protected ConcurrentHashMap<ServerName, ConcurrentMap<byte[], RegionStats>> 
serverStats
+          = new ConcurrentHashMap<ServerName, ConcurrentMap<byte[], 
RegionStats>>();
+
+  public void updateServerStats(ServerName serverName, byte[] regionName,
+                                Object r) {
+    if (!(r instanceof Result)) {
+      return;
+    }
+    Result result = (Result) r;
+    ClientProtos.RegionLoadStats stats = result.getStats();
+    if(stats == null){
+      return;
+    }
+    String name = serverName.getServerName() + "," + 
Bytes.toStringBinary(regionName);
+    ConcurrentMap<byte[], RegionStats> rsStats = null;
+    if (serverStats.containsKey(serverName)) {
+      rsStats = serverStats.get(serverName);
+    } else {
+      rsStats = serverStats.putIfAbsent(serverName,
+          new ConcurrentSkipListMap<byte[], 
RegionStats>(Bytes.BYTES_COMPARATOR));
+      if (rsStats == null) {
+        rsStats = serverStats.get(serverName);
+      }
+    }
+    RegionStats regionStats = null;
+    if (rsStats.containsKey(regionName)) {
+      regionStats = rsStats.get(regionName);
+    } else {
+      regionStats = rsStats.putIfAbsent(regionName, new 
RegionStats(this.registry, name));
+      if (regionStats == null) {
+        regionStats = rsStats.get(regionName);
+      }
+    }
+    regionStats.update(stats);
+  }
+
+
   /** A lambda for dispatching to the appropriate metric factory method */
   private static interface NewMetric<T> {
     T newMetric(Class<?> clazz, String name, String scope);
@@ -172,6 +260,7 @@ public class MetricsConnection {
   @VisibleForTesting protected final CallTracker incrementTracker;
   @VisibleForTesting protected final CallTracker putTracker;
   @VisibleForTesting protected final CallTracker multiTracker;
+  @VisibleForTesting protected final RunnerStats runnerStats;
 
   // dynamic metrics
 
@@ -217,6 +306,8 @@ public class MetricsConnection {
     this.incrementTracker = new CallTracker(this.registry, "Mutate", 
"Increment", scope);
     this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
     this.multiTracker = new CallTracker(this.registry, "Multi", scope);
+    this.runnerStats = new RunnerStats(this.registry);
+
     this.reporter = new JmxReporter(this.registry);
     this.reporter.start();
   }
@@ -242,6 +333,21 @@ public class MetricsConnection {
     metaCacheMisses.inc();
   }
 
+  /** Increment the number of normal runner counts. */
+  public void incrNormalRunners() {
+    this.runnerStats.incrNormalRunners();
+  }
+
+  /** Increment the number of delay runner counts. */
+  public void incrDelayRunners() {
+    this.runnerStats.incrDelayRunners();
+  }
+
+  /** Update delay interval of delay runner. */
+  public void updateDelayInterval(long interval) {
+    this.runnerStats.updateDelayInterval(interval);
+  }
+
   /**
    * Get a metric for {@code key} from {@code map}, or create it with {@code 
factory}.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/086bacd1/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index 995e3e5..1efbe05 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -42,6 +42,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static 
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -74,7 +75,7 @@ public class TestClientPushback {
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
     // ensure we block the flushes when we are double that flushsize
     conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
-
+    conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
     UTIL.startMiniCluster(1);
     UTIL.createTable(tableName, family);
   }
@@ -87,6 +88,7 @@ public class TestClientPushback {
   @Test(timeout=60000)
   public void testClientTracksServerPushback() throws Exception{
     Configuration conf = UTIL.getConfiguration();
+
     ClusterConnection conn = (ClusterConnection) 
ConnectionFactory.createConnection(conf);
     HTable table = (HTable) conn.getTable(tableName);
 
@@ -119,7 +121,6 @@ public class TestClientPushback {
     ServerStatistics.RegionStatistics regionStats = 
serverStats.getStatsForRegion(regionName);
     assertEquals("We did not find some load on the memstore", load,
       regionStats.getMemstoreLoadPercent());
-
     // check that the load reported produces a nonzero delay
     long backoffTime = backoffPolicy.getBackoffTime(server, regionName, 
serverStats);
     assertNotEquals("Reported load does not produce a backoff", backoffTime, 
0);
@@ -145,6 +146,21 @@ public class TestClientPushback {
     // produces a backoffTime of 151 milliseconds. This is long enough so the
     // wait and related checks below are reasonable. Revisit if the backoff
     // time reported by above debug logging has significantly deviated.
+    String name = server.getServerName() + "," + 
Bytes.toStringBinary(regionName);
+    MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
+            serverStats.get(server).get(regionName);
+    assertEquals(name, rsStats.name);
+    assertEquals(rsStats.heapOccupancyHist.mean(),
+        (double)regionStats.getHeapOccupancyPercent(), 0.1 );
+    assertEquals(rsStats.memstoreLoadHist.mean(),
+        (double)regionStats.getMemstoreLoadPercent(), 0.1);
+
+    MetricsConnection.RunnerStats runnerStats = 
conn.getConnectionMetrics().runnerStats;
+
+    assertEquals(runnerStats.delayRunners.count(), 1);
+    assertEquals(runnerStats.normalRunners.count(), 1);
+    assertEquals("", runnerStats.delayIntevalHist.mean(), (double)backoffTime, 
0.1);
+
     latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
     assertNotEquals("AsyncProcess did not submit the work time", 
endTime.get(), 0);
     assertTrue("AsyncProcess did not delay long enough", endTime.get() - 
startTime >= backoffTime);

Reply via email to