This is an automated email from the ASF dual-hosted git repository.

busbey pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d89e5270aa2a5ef50a472798c7b74ef22e8be07b
Author: Xu Cang <xc...@salesforce.com>
AuthorDate: Thu May 31 14:02:26 2018 -0700

    HBASE-19722 Meta query statistics metrics source
    
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
    (cherry picked from commit 58ccd3dc7eb59ce41a2a48af792adb342520e626)
---
 .../java/org/apache/hadoop/hbase/HConstants.java   |   2 +
 .../hadoop/hbase/coprocessor/MetaTableMetrics.java | 335 +++++++++++++++++++++
 .../apache/hadoop/hbase/util/LossyCounting.java    | 135 +++++++++
 .../hbase/coprocessor/TestMetaTableMetrics.java    | 231 ++++++++++++++
 .../hadoop/hbase/util/TestLossyCounting.java       |  88 ++++++
 5 files changed, 791 insertions(+)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 42288c5..19581f5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1402,6 +1402,8 @@ public final class HConstants {
   public static final String DEFAULT_SNAPSHOT_RESTORE_FAILSAFE_NAME =
       "hbase-failsafe-{snapshot.name}-{restore.timestamp}";
 
+  public static final String DEFAULT_LOSSY_COUNTING_ERROR_RATE =
+      "hbase.util.default.lossycounting.errorrate";
   public static final String NOT_IMPLEMENTED = "Not implemented";
 
   private HConstants() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
new file mode 100644
index 0000000..9bf35c0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.metrics.Meter;
+import org.apache.hadoop.hbase.metrics.Metric;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.util.LossyCounting;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+
+/**
+ * A coprocessor that collects metrics from meta table.
+ * <p>
+ * These metrics will be available through the regular Hadoop metrics2 sinks 
(ganglia, opentsdb,
+ * etc) as well as JMX output.
+ * </p>
+ * @see MetaTableMetrics
+ */
+
+@InterfaceAudience.Private
+public class MetaTableMetrics implements RegionCoprocessor {
+
+  private ExampleRegionObserverMeta observer;
+  private Map<String, Optional<Metric>> requestsMap;
+  private RegionCoprocessorEnvironment regionCoprocessorEnv;
+  private LossyCounting clientMetricsLossyCounting;
+  private boolean active = false;
+
+  enum MetaTableOps {
+    GET, PUT, DELETE;
+  }
+
+  private ImmutableMap<Class, MetaTableOps> opsNameMap =
+      ImmutableMap.<Class, MetaTableOps>builder()
+              .put(Put.class, MetaTableOps.PUT)
+              .put(Get.class, MetaTableOps.GET)
+              .put(Delete.class, MetaTableOps.DELETE)
+              .build();
+
+  class ExampleRegionObserverMeta implements RegionCoprocessor, RegionObserver 
{
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get 
get,
+        List<Cell> results) throws IOException {
+      if (!active || !isMetaTableOp(e)) {
+        return;
+      }
+      tableMetricRegisterAndMark(e, get);
+      clientMetricRegisterAndMark(e);
+      regionMetricRegisterAndMark(e, get);
+      opMetricRegisterAndMark(e, get);
+      opWithClientMetricRegisterAndMark(e, get);
+    }
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put 
put, WALEdit edit,
+        Durability durability) throws IOException {
+      if (!active || !isMetaTableOp(e)) {
+        return;
+      }
+      tableMetricRegisterAndMark(e, put);
+      clientMetricRegisterAndMark(e);
+      regionMetricRegisterAndMark(e, put);
+      opMetricRegisterAndMark(e, put);
+      opWithClientMetricRegisterAndMark(e, put);
+    }
+
+    @Override
+    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, 
Delete delete,
+        WALEdit edit, Durability durability) throws IOException {
+      if (!active || !isMetaTableOp(e)) {
+        return;
+      }
+      tableMetricRegisterAndMark(e, delete);
+      clientMetricRegisterAndMark(e);
+      regionMetricRegisterAndMark(e, delete);
+      opMetricRegisterAndMark(e, delete);
+      opWithClientMetricRegisterAndMark(e, delete);
+    }
+
+    private void markMeterIfPresent(String requestMeter) {
+      if (requestMeter.isEmpty()) {
+        return;
+      }
+      Metric metric =
+          requestsMap.get(requestMeter).isPresent() ? 
requestsMap.get(requestMeter).get() : null;
+      if (metric != null) {
+        ((Meter) metric).mark();
+      }
+    }
+
+    private void 
registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
+        String requestMeter) {
+      if (requestMeter.isEmpty()) {
+        return;
+      }
+      if (!requestsMap.containsKey(requestMeter)) {
+        MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+        registry.meter(requestMeter);
+        requestsMap.put(requestMeter, registry.get(requestMeter));
+      }
+    }
+
+    /**
+     * Registers and counts lossyCount for Meters that kept by lossy counting.
+     * By using lossy count to maintain meters, at most 7 / e meters will be 
kept  (e is error rate)
+     * e.g. when e is 0.02 by default, at most 50 Clients request metrics will 
be kept
+     *      also, all kept elements have frequency higher than e * N. (N is 
total count)
+     * @param e Region coprocessor environment
+     * @param requestMeter meter to be registered
+     * @param lossyCounting lossyCounting object for one type of meters.
+     */
+    private void registerLossyCountingMeterIfNotPresent(
+        ObserverContext<RegionCoprocessorEnvironment> e,
+        String requestMeter, LossyCounting lossyCounting) {
+      if (requestMeter.isEmpty()) {
+        return;
+      }
+      Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
+      if(!requestsMap.containsKey(requestMeter) && 
metersToBeRemoved.contains(requestMeter)){
+        for(String meter: metersToBeRemoved) {
+          //cleanup requestsMap according swept data from lossy count;
+          requestsMap.remove(meter);
+          MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+          registry.remove(meter);
+        }
+        // newly added meter is swept by lossy counting cleanup. No need to 
put it into requestsMap.
+        return;
+      }
+
+      if (!requestsMap.containsKey(requestMeter)) {
+        MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+        registry.meter(requestMeter);
+        requestsMap.put(requestMeter, registry.get(requestMeter));
+      }
+    }
+
+    /**
+     * Get table name from Ops such as: get, put, delete.
+     * @param op such as get, put or delete.
+     */
+    private String getTableNameFromOp(Row op) {
+      String tableName = null;
+      String tableRowKey = new String(((Row) op).getRow(), 
StandardCharsets.UTF_8);
+      if (tableRowKey.isEmpty()) {
+        return null;
+      }
+      tableName = tableRowKey.split(",").length > 0 ? 
tableRowKey.split(",")[0] : null;
+      return tableName;
+    }
+
+    /**
+     * Get regionId from Ops such as: get, put, delete.
+     * @param op  such as get, put or delete.
+     */
+    private String getRegionIdFromOp(Row op) {
+      String regionId = null;
+      String tableRowKey = new String(((Row) op).getRow(), 
StandardCharsets.UTF_8);
+      if (tableRowKey.isEmpty()) {
+        return null;
+      }
+      regionId = tableRowKey.split(",").length > 2 ? tableRowKey.split(",")[2] 
: null;
+      return regionId;
+    }
+
+    private boolean 
isMetaTableOp(ObserverContext<RegionCoprocessorEnvironment> e) {
+      return TableName.META_TABLE_NAME.toString()
+          .equals(new 
String(e.getEnvironment().getRegionInfo().getTable().getName(),
+              StandardCharsets.UTF_8));
+    }
+
+    private void 
clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
+      String clientIP = RpcServer.getRemoteIp() != null ? 
RpcServer.getRemoteIp().toString() : "";
+
+      String clientRequestMeter = clientRequestMeterName(clientIP);
+      registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, 
clientMetricsLossyCounting);
+      markMeterIfPresent(clientRequestMeter);
+    }
+
+    private void 
tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
+        Row op) {
+      // Mark the meta table meter whenever the coprocessor is called
+      String tableName = getTableNameFromOp(op);
+      String tableRequestMeter = tableMeterName(tableName);
+      registerMeterIfNotPresent(e, tableRequestMeter);
+      markMeterIfPresent(tableRequestMeter);
+    }
+
+    private void 
regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
+        Row op) {
+      // Mark the meta table meter whenever the coprocessor is called
+      String regionId = getRegionIdFromOp(op);
+      String regionRequestMeter = regionMeterName(regionId);
+      registerMeterIfNotPresent(e, regionRequestMeter);
+      markMeterIfPresent(regionRequestMeter);
+    }
+
+    private void 
opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
+        Row op) {
+      String opMeterName = opMeterName(op);
+      registerMeterIfNotPresent(e, opMeterName);
+      markMeterIfPresent(opMeterName);
+    }
+
+    private void 
opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> 
e,
+        Object op) {
+      String opWithClientMeterName = opWithClientMeterName(op);
+      registerMeterIfNotPresent(e, opWithClientMeterName);
+      markMeterIfPresent(opWithClientMeterName);
+    }
+
+    private String opWithClientMeterName(Object op) {
+      String clientIP = RpcServer.getRemoteIp() != null ? 
RpcServer.getRemoteIp().toString() : "";
+      if (clientIP.isEmpty()) {
+        return "";
+      }
+      MetaTableOps ops = opsNameMap.get(op.getClass());
+      String opWithClientMeterName = "";
+      switch (ops) {
+        case GET:
+          opWithClientMeterName = 
String.format("MetaTable_client_%s_get_request", clientIP);
+          break;
+        case PUT:
+          opWithClientMeterName = 
String.format("MetaTable_client_%s_put_request", clientIP);
+          break;
+        case DELETE:
+          opWithClientMeterName = 
String.format("MetaTable_client_%s_delete_request", clientIP);
+          break;
+        default:
+          break;
+      }
+      return opWithClientMeterName;
+    }
+
+    private String opMeterName(Object op) {
+      MetaTableOps ops = opsNameMap.get(op.getClass());
+      String opMeterName = "";
+      switch (ops) {
+        case GET:
+          opMeterName = "MetaTable_get_request";
+          break;
+        case PUT:
+          opMeterName = "MetaTable_put_request";
+          break;
+        case DELETE:
+          opMeterName = "MetaTable_delete_request";
+          break;
+        default:
+          break;
+      }
+      return opMeterName;
+    }
+
+    private String tableMeterName(String tableName) {
+      return String.format("MetaTable_table_%s_request", tableName);
+    }
+
+    private String clientRequestMeterName(String clientIP) {
+      if (clientIP.isEmpty()) {
+        return "";
+      }
+      return String.format("MetaTable_client_%s_request", clientIP);
+    }
+
+    private String regionMeterName(String regionId) {
+      return String.format("MetaTable_region_%s_request", regionId);
+    }
+  }
+
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(observer);
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment
+        && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != 
null
+        && ((RegionCoprocessorEnvironment) 
env).getRegionInfo().getTable().getName() != null
+        && new String(((RegionCoprocessorEnvironment) 
env).getRegionInfo().getTable().getName(),
+          
StandardCharsets.UTF_8).equals(TableName.META_TABLE_NAME.toString())) {
+      regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+      observer = new ExampleRegionObserverMeta();
+      requestsMap = new ConcurrentHashMap<>();
+      clientMetricsLossyCounting = new LossyCounting();
+      // only be active mode when this region holds meta table.
+      active = true;
+    } else {
+      observer = new ExampleRegionObserverMeta();
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    // since meta region can move around, clear stale metrics when stop.
+    for (String meterName : requestsMap.keySet()) {
+      MetricRegistry registry = 
regionCoprocessorEnv.getMetricRegistryForRegionServer();
+      registry.remove(meterName);
+    }
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
new file mode 100644
index 0000000..c0da303
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -0,0 +1,135 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * LossyCounting utility, bounded data structure that maintains approximate 
high frequency
+ * elements in data stream.
+ *
+ * Bucket size is 1 / error rate.  (Error rate is 0.02 by default)
+ * Lemma If element does not appear in set, then is frequency is less than e 
* N
+ *       (N is total element counts until now.)
+ * Based on paper:
+ * http://www.vldb.org/conf/2002/S10P03.pdf
+ */
+
+@InterfaceAudience.Public
+public class LossyCounting {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LossyCounting.class);
+  private long bucketSize;
+  private long currentTerm;
+  private double errorRate;
+  private Map<String, Integer> data;
+  private long totalDataCount;
+
+  public LossyCounting(double errorRate) {
+    this.errorRate = errorRate;
+    if (errorRate < 0.0 || errorRate > 1.0) {
+      throw new IllegalArgumentException(" Lossy Counting error rate should be 
within range [0,1]");
+    }
+    this.bucketSize = (long) Math.ceil(1 / errorRate);
+    this.currentTerm = 1;
+    this.totalDataCount = 0;
+    this.errorRate = errorRate;
+    this.data = new ConcurrentHashMap<>();
+    calculateCurrentTerm();
+  }
+
+  public LossyCounting() {
+    Configuration conf = HBaseConfiguration.create();
+    this.errorRate = 
conf.getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02);
+    this.bucketSize = (long) Math.ceil(1.0 / errorRate);
+    this.currentTerm = 1;
+    this.totalDataCount = 0;
+    this.data = new ConcurrentHashMap<>();
+    calculateCurrentTerm();
+  }
+
+  public Set<String> addByOne(String key) {
+    if(data.containsKey(key)) {
+      data.put(key, data.get(key) +1);
+    } else {
+      data.put(key, 1);
+    }
+    totalDataCount++;
+    calculateCurrentTerm();
+    Set<String> dataToBeSwept = new HashSet<>();
+    if(totalDataCount % bucketSize == 0) {
+      dataToBeSwept = sweep();
+    }
+    return dataToBeSwept;
+  }
+
+  /**
+   * sweep low frequency data
+   * @return Names of elements got swept
+   */
+  private Set<String> sweep() {
+    Set<String> dataToBeSwept = new HashSet<>();
+    for(Map.Entry<String, Integer> entry : data.entrySet()) {
+      if(entry.getValue() + errorRate < currentTerm) {
+        dataToBeSwept.add(entry.getKey());
+      }
+    }
+    for(String key : dataToBeSwept) {
+      data.remove(key);
+    }
+    LOG.debug(String.format("Swept %d of elements.", dataToBeSwept.size()));
+    return dataToBeSwept;
+  }
+
+  /**
+   * Calculate and set current term
+   */
+  private void calculateCurrentTerm() {
+    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
+  }
+
+  public long getBuketSize(){
+    return bucketSize;
+  }
+
+  public long getDataSize() {
+    return data.size();
+  }
+
+  public boolean contains(String key) {
+    return data.containsKey(key);
+  }
+
+  public long getCurrentTerm() {
+    return currentTerm;
+  }
+}
+
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
new file mode 100644
index 0000000..7c1c242
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.JMXListener;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestMetaTableMetrics {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMetaTableMetrics.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestMetaTableMetrics.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static final TableName NAME1 = 
TableName.valueOf("TestExampleMetaTableMetricsOne");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+  private static final ColumnFamilyDescriptor CFD =
+      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).build();
+  private static final int NUM_ROWS = 5;
+  private static final String value = "foo";
+  private static Configuration conf = null;
+  private static int connectorPort = 61120;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+
+    conf = UTIL.getConfiguration();
+    // Set system coprocessor so it can be applied to meta regions
+    UTIL.getConfiguration().set("hbase.coprocessor.region.classes",
+      MetaTableMetrics.class.getName());
+    conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 
JMXListener.class.getName());
+    Random rand = new Random();
+    for (int i = 0; i < 10; i++) {
+      do {
+        int sign = i % 2 == 0 ? 1 : -1;
+        connectorPort += sign * rand.nextInt(100);
+      } while (!HBaseTestingUtility.available(connectorPort));
+      try {
+        conf.setInt("regionserver.rmi.registry.port", connectorPort);
+        UTIL.startMiniCluster(1);
+        break;
+      } catch (Exception e) {
+        LOG.debug("Encountered exception when starting cluster. Trying port " 
+ connectorPort, e);
+        try {
+          // this is to avoid "IllegalStateException: A mini-cluster is 
already running"
+          UTIL.shutdownMiniCluster();
+        } catch (Exception ex) {
+          LOG.debug("Encountered exception shutting down cluster", ex);
+        }
+      }
+    }
+    UTIL.getAdmin()
+        .createTable(TableDescriptorBuilder.newBuilder(NAME1)
+            .setColumnFamily(CFD)
+            .build());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private void writeData(Table t) throws IOException {
+    List<Put> puts = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      Put p = new Put(Bytes.toBytes(i + 1));
+      p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(value));
+      puts.add(p);
+    }
+    t.put(puts);
+  }
+
+  private Set<String> readJmxMetricsWithRetry() throws IOException {
+    final int count = 0;
+    for (int i = 0; i < 10; i++) {
+      Set<String> metrics = readJmxMetrics();
+      if (metrics != null) {
+        return metrics;
+      }
+      LOG.warn("Failed to get jmxmetrics... sleeping, retrying; " + i + " of " 
+ count + " times");
+      Threads.sleep(1000);
+    }
+    return null;
+  }
+
+  /**
+   * Read the attributes from Hadoop->HBase->RegionServer->MetaTableMetrics in 
JMX
+   * @throws IOException when fails to retrieve jmx metrics.
+   */
+  // this method comes from this class: TestStochasticBalancerJmxMetrics with 
minor modifications.
+  private Set<String> readJmxMetrics() throws IOException {
+    JMXConnector connector = null;
+    ObjectName target = null;
+    MBeanServerConnection mb = null;
+    try {
+      connector =
+          
JMXConnectorFactory.connect(JMXListener.buildJMXServiceURL(connectorPort, 
connectorPort));
+      mb = connector.getMBeanServerConnection();
+
+      @SuppressWarnings("JdkObsolete")
+      Hashtable<String, String> pairs = new Hashtable<>();
+      pairs.put("service", "HBase");
+      pairs.put("name", "RegionServer");
+      pairs.put("sub",
+        "Coprocessor.Region.CP_org.apache.hadoop.hbase.coprocessor"
+            + ".MetaTableMetrics");
+      target = new ObjectName("Hadoop", pairs);
+      MBeanInfo beanInfo = mb.getMBeanInfo(target);
+
+      Set<String> existingAttrs = new HashSet<>();
+      for (MBeanAttributeInfo attrInfo : beanInfo.getAttributes()) {
+        existingAttrs.add(attrInfo.getName());
+      }
+      return existingAttrs;
+    } catch (Exception e) {
+      LOG.warn("Failed to get bean." + target, e);
+      if (mb != null) {
+        Set<ObjectInstance> instances = mb.queryMBeans(null, null);
+        Iterator<ObjectInstance> iterator = instances.iterator();
+        LOG.warn("MBean Found:");
+        while (iterator.hasNext()) {
+          ObjectInstance instance = iterator.next();
+          LOG.warn("Class Name: " + instance.getClassName());
+          LOG.warn("Object Name: " + instance.getObjectName());
+        }
+      }
+    } finally {
+      if (connector != null) {
+        try {
+          connector.close();
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    return null;
+  }
+
+  // verifies meta table metrics exist from jmx
+  // for one table, there should be 5 MetaTable_table_<TableName> metrics.
+  // such as:
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_count
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_mean_rate
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_1min_rate
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  //    MetaTable_table_TestExampleMetaTableMetricsOne_request_5min_rate
+  // [Time-limited test] example.TestMetaTableMetrics(204): ==
+  // MetaTable_table_TestExampleMetaTableMetricsOne_request_15min_rate
+  @Test
+  public void test() throws IOException, InterruptedException {
+    try (Table t = UTIL.getConnection().getTable(NAME1)) {
+      writeData(t);
+      // Flush the data
+      UTIL.flush(NAME1);
+      // Issue a compaction
+      UTIL.compact(NAME1, true);
+      Thread.sleep(2000);
+    }
+    Set<String> jmxMetrics = readJmxMetricsWithRetry();
+    assertNotNull(jmxMetrics);
+    long name1TableMetricsCount =
+        jmxMetrics.stream().filter(metric -> 
metric.contains("MetaTable_table_" + NAME1)).count();
+    assertEquals(5L, name1TableMetricsCount);
+
+    String putWithClientMetricNameRegex = "MetaTable_client_.+_put_request.*";
+    long putWithClientMetricsCount =
+            jmxMetrics.stream().filter(metric -> 
metric.matches(putWithClientMetricNameRegex))
+                    .count();
+    assertEquals(5L, putWithClientMetricsCount);
+
+
+
+
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
new file mode 100644
index 0000000..a365740
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestLossyCounting {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestLossyCounting.class);
+
+  @Test
+  public void testBucketSize() {
+    LossyCounting lossyCounting = new LossyCounting(0.01);
+    assertEquals(100L, lossyCounting.getBuketSize());
+    LossyCounting lossyCounting2 = new LossyCounting();
+    assertEquals(50L, lossyCounting2.getBuketSize());
+  }
+
+  @Test
+  public void testAddByOne() {
+    LossyCounting lossyCounting = new LossyCounting(0.01);
+    for(int i = 0; i < 100; i++){
+      String key = "" + i;
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(100L, lossyCounting.getDataSize());
+    for(int i = 0; i < 100; i++){
+      String key = "" + i;
+      assertEquals(true, lossyCounting.contains(key));
+    }
+  }
+
+  @Test
+  public void testSweep1() {
+    LossyCounting lossyCounting = new LossyCounting(0.01);
+    for(int i = 0; i < 400; i++){
+      String key = "" + i;
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(4L, lossyCounting.getCurrentTerm());
+    assertEquals(0L, lossyCounting.getDataSize());
+  }
+
+  @Test
+  public void testSweep2() {
+    LossyCounting lossyCounting = new LossyCounting(0.1);
+    for(int i = 0; i < 10; i++){
+      String key = "" + i;
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(10L, lossyCounting.getDataSize());
+    for(int i = 0; i < 10; i++){
+      String key = "1";
+      lossyCounting.addByOne(key);
+    }
+    assertEquals(1L, lossyCounting.getDataSize());
+  }
+
+
+}
\ No newline at end of file

Reply via email to