Repository: phoenix
Updated Branches:
  refs/heads/master 05edcd137 -> bf24e3824


PHOENIX-3655 Global Phoenix Client Metrics for PQS


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bf24e382
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bf24e382
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bf24e382

Branch: refs/heads/master
Commit: bf24e3824e894532e833ac1d840a3507a3c36a3d
Parents: 05edcd1
Author: Karan Mehta <[email protected]>
Authored: Fri Jul 27 01:57:33 2018 -0700
Committer: Karan Mehta <[email protected]>
Committed: Mon Aug 20 14:01:21 2018 -0700

----------------------------------------------------------------------
 .../GlobalPhoenixMetricsTestSink.java           |  51 ++++++
 .../monitoring/PhoenixMetricsDisabledIT.java    |  80 +++++++++
 .../phoenix/monitoring/PhoenixMetricsIT.java    |  75 +++++++-
 .../phoenix/monitoring/GlobalClientMetrics.java |  74 ++++++--
 .../phoenix/monitoring/GlobalMetricImpl.java    |   3 +
 .../GlobalMetricRegistriesAdapter.java          | 170 +++++++++++++++++++
 .../monitoring/NoOpGlobalMetricImpl.java        |  72 ++++++++
 .../org/apache/phoenix/query/QueryServices.java |   3 +
 .../phoenix/query/QueryServicesOptions.java     |  13 +-
 .../test/resources/hadoop-metrics2.properties   |   9 +-
 10 files changed, 532 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java
new file mode 100644
index 0000000..8234c1c
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import org.apache.commons.configuration2.SubsetConfiguration;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+
+public class GlobalPhoenixMetricsTestSink implements MetricsSink {
+
+    public static final String PHOENIX_METRICS_RECORD_NAME = "PHOENIX";
+    // PhoenixMetricsIT tests verifies these metrics from this sink in a 
separate thread
+    // GlobalPhoenixMetricsTestSink is invoked based on time defined in 
hadoop-metrics2.properties
+    // This lock is to prevent concurrent access to metrics Iterable for these 
threads
+    static Object lock = new Object();
+    static Iterable<AbstractMetric> metrics;
+
+    @Override
+    public void putMetrics(MetricsRecord metricsRecord) {
+        if (metricsRecord.name().equals(PHOENIX_METRICS_RECORD_NAME)) {
+            synchronized (GlobalPhoenixMetricsTestSink.lock) {
+                GlobalPhoenixMetricsTestSink.metrics = metricsRecord.metrics();
+            }
+        }
+    }
+
+    @Override
+    public void flush() {
+    }
+
+    @Override
+    public void init(SubsetConfiguration subsetConfiguration) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java
new file mode 100644
index 0000000..85cf1a3
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.ConfigurationFactory;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.InstanceResolver;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.hamcrest.CoreMatchers;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.DriverManager;
+import java.util.Map;
+
+import static org.apache.phoenix.monitoring.NoOpGlobalMetricImpl.NO_SAMPLES;
+import static org.apache.phoenix.monitoring.NoOpGlobalMetricImpl.NO_VALUE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class PhoenixMetricsDisabledIT extends BaseUniqueNamesOwnClusterIT {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        final Configuration conf = HBaseConfiguration.create();
+        conf.set(QueryServices.GLOBAL_METRICS_ENABLED, String.valueOf(false));
+        conf.set(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false));
+        // Clear the cached singletons so we can inject our own.
+        InstanceResolver.clearSingletons();
+        // Make sure the ConnectionInfo doesn't try to pull a default 
Configuration
+        InstanceResolver.getSingleton(ConfigurationFactory.class, new 
ConfigurationFactory() {
+            @Override
+            public Configuration getConfiguration() {
+                return conf;
+            }
+            @Override
+            public Configuration getConfiguration(Configuration confToClone) {
+                Configuration copy = new Configuration(conf);
+                copy.addResource(confToClone);
+                return copy;
+            }
+        });
+
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @Test
+    public void testResetGlobalPhoenixMetrics() {
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            assertThat(m, 
CoreMatchers.<GlobalMetric>instanceOf(NoOpGlobalMetricImpl.class));
+            assertEquals(NO_VALUE, m.getTotalSum());
+            assertEquals(NO_SAMPLES, m.getNumberOfSamples());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 4c5c592..9d8812a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -40,6 +40,7 @@ import static 
org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -50,6 +51,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -61,6 +63,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -73,6 +78,7 @@ import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.hamcrest.CoreMatchers;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;
@@ -82,8 +88,18 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+/**
+ * Tests that
+ * 1. Phoenix Global metrics are exposed via
+ *   a. PhoenixRuntime b. Hadoop-Metrics2 defined sinks
+ * 2. Phoenix Request level metrics are exposed via
+ *   a. PhoenixRuntime
+ */
 public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
 
+    private static final Log LOG = LogFactory.getLog(PhoenixMetricsIT.class);
+    private static final int MAX_RETRIES = 5;
+
     private static final List<MetricType> mutationMetricsToSkip =
             Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME);
     private static final List<MetricType> readMetricsToSkip =
@@ -96,6 +112,7 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        // Phoenix Global client metrics are enabled by default
         // Enable request metric collection at the driver level
         props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, 
String.valueOf(true));
         // disable renewing leases as this will force spooling to happen.
@@ -103,15 +120,18 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
         // need the non-test driver for some tests that check number of 
hconnections, etc.
         DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+
     }
 
     @Test
-    public void testResetGlobalPhoenixMetrics() {
+    public void testResetGlobalPhoenixMetrics() throws Exception {
         resetGlobalMetrics();
         for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            assertThat(m, 
CoreMatchers.<GlobalMetric>instanceOf(GlobalMetricImpl.class));
             assertEquals(0, m.getTotalSum());
             assertEquals(0, m.getNumberOfSamples());
         }
+        assertTrue(verifyMetricsFromSink());
     }
 
     @Test
@@ -145,6 +165,8 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         
assertTrue(GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS.getMetric().getTotalSum() > 
0);
         
assertTrue(GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS.getMetric().getTotalSum()
 > 0);
         
assertTrue(GLOBAL_HBASE_COUNT_SCANNED_REGIONS.getMetric().getTotalSum() > 0);
+
+        assertTrue(verifyMetricsFromSink());
     }
 
     @Test
@@ -162,6 +184,8 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum());
         assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum());
         assertEquals(0, 
GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum());
+
+        assertTrue(verifyMetricsFromSink());
     }
 
     @Test
@@ -195,6 +219,8 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         
assertTrue(GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS.getMetric().getTotalSum() > 
0);
         
assertTrue(GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS.getMetric().getTotalSum()
 > 0);
         
assertTrue(GLOBAL_HBASE_COUNT_SCANNED_REGIONS.getMetric().getTotalSum() > 0);
+
+        assertTrue(verifyMetricsFromSink());
     }
 
     private static void resetGlobalMetrics() {
@@ -203,6 +229,53 @@ public class PhoenixMetricsIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    // Phoenix Client Metrics are transported via Hadoop-metrics2 sink
+    // The test sink is defined at GlobalPhoenixMetricsTestSink
+    // Configuration for Hadoop-metrics2 comes from hadoop-metrics2.properties 
file located in test/resources
+    private boolean verifyMetricsFromSink() {
+        Map<String, Long> expectedMetrics = new HashMap<>();
+        for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+            expectedMetrics.put(m.getMetricType().name(), m.getTotalSum());
+        }
+
+        for (int i = 0; i < MAX_RETRIES; i++) {
+            LOG.info("Verifying Global Metrics from Hadoop Sink, Retry: " + (i 
+ 1));
+            if (verifyMetricsFromSinkOnce(expectedMetrics)) {
+                LOG.info("Values from Hadoop Metrics Sink match actual 
values");
+                return true;
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+        }
+        return false;
+    }
+
+    private boolean verifyMetricsFromSinkOnce(Map<String, Long> 
expectedMetrics) {
+        synchronized (GlobalPhoenixMetricsTestSink.lock) {
+            for (AbstractMetric metric : GlobalPhoenixMetricsTestSink.metrics) 
{
+                if (expectedMetrics.containsKey(metric.name())) {
+                    Long value = expectedMetrics.get(metric.name());
+                    if (value != null) {
+                        long expectedValue = value;
+                        long actualValue = metric.value().longValue();
+                        if (expectedValue != actualValue) {
+                            LOG.warn("Metric from Hadoop Sink: " + 
metric.name() + " didn't match expected.");
+                            return false;
+                        }
+                        expectedMetrics.remove(metric.name());
+                    }
+                }
+            }
+        }
+        assertTrue("Metric expected but not present in Hadoop Metrics Sink 
(GlobalPhoenixMetricsTestSink)",
+                expectedMetrics.size() == 0);
+        return true;
+    }
+
     private static void createTableAndInsertValues(String tableName, boolean 
resetGlobalMetricsAfterTableCreate)
             throws Exception {
         String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL 
PRIMARY KEY, V VARCHAR)";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
index 9c95895..e7c7bae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java
@@ -59,9 +59,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.apache.hadoop.hbase.metrics.MetricRegistries;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
 import org.apache.phoenix.query.QueryServicesOptions;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Central place where we keep track of all the global client phoenix metrics. 
These metrics are different from
@@ -108,40 +114,88 @@ public enum GlobalClientMetrics {
     GLOBAL_HBASE_COUNT_ROWS_SCANNED(COUNT_ROWS_SCANNED),
     GLOBAL_HBASE_COUNT_ROWS_FILTERED(COUNT_ROWS_FILTERED);
 
-    
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlobalClientMetrics.class);
     private static final boolean isGlobalMetricsEnabled = 
QueryServicesOptions.withDefaults().isGlobalMetricsEnabled();
+    private MetricType metricType;
     private GlobalMetric metric;
 
-    public void update(long value) {
+    static {
+        initPhoenixGlobalClientMetrics();
         if (isGlobalMetricsEnabled) {
-            metric.change(value);
+            MetricRegistry metricRegistry = createMetricRegistry();
+            registerPhoenixMetricsToRegistry(metricRegistry);
+            
GlobalMetricRegistriesAdapter.getInstance().registerMetricRegistry(metricRegistry);
+        }
+    }
+
+    private static void initPhoenixGlobalClientMetrics() {
+        for (GlobalClientMetrics globalMetric : GlobalClientMetrics.values()) {
+            globalMetric.metric = isGlobalMetricsEnabled ?
+                    new GlobalMetricImpl(globalMetric.metricType) : new 
NoOpGlobalMetricImpl();
+        }
+    }
+
+    private static void registerPhoenixMetricsToRegistry(MetricRegistry 
metricRegistry) {
+        for (GlobalClientMetrics globalMetric : GlobalClientMetrics.values()) {
+            metricRegistry.register(globalMetric.metricType.columnName(),
+                    new PhoenixGlobalMetricGauge(globalMetric.metric));
+        }
+    }
+
+    private static MetricRegistry createMetricRegistry() {
+        LOG.info("Creating Metric Registry for Phoenix Global Metrics");
+        MetricRegistryInfo registryInfo = new MetricRegistryInfo("PHOENIX", 
"Phoenix Client Metrics",
+                "phoenix", "Phoenix,sub=CLIENT", true);
+        return MetricRegistries.global().create(registryInfo);
+    }
+
+    /**
+     * Class to convert Phoenix Metric objects into HBase Metric objects 
(Gauge)
+     */
+    private static class PhoenixGlobalMetricGauge implements Gauge<Long> {
+
+        private final GlobalMetric metric;
+
+        public PhoenixGlobalMetricGauge(GlobalMetric metric) {
+            this.metric = metric;
+        }
+
+        @Override
+        public Long getValue() {
+            return metric.getValue();
         }
     }
 
+    public void update(long value) {
+        metric.change(value);
+    }
+
     @VisibleForTesting
     public GlobalMetric getMetric() {
         return metric;
     }
 
+    @VisibleForTesting
+    public MetricType getMetricType() {
+        return metricType;
+    }
+
+
     @Override
     public String toString() {
         return metric.toString();
     }
 
     private GlobalClientMetrics(MetricType metricType) {
-        this.metric = new GlobalMetricImpl(metricType);
+        this.metricType = metricType;
     }
 
     public void increment() {
-        if (isGlobalMetricsEnabled) {
-            metric.increment();
-        }
+        metric.increment();
     }
     
     public void decrement() {
-        if (isGlobalMetricsEnabled) {
-            metric.decrement();
-        }
+        metric.decrement();
     }
 
     public static Collection<GlobalMetric> getMetrics() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
index ce692f2..ba2d925 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java
@@ -11,6 +11,9 @@ package org.apache.phoenix.monitoring;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * Default implementation used when GlobalMetrics are enabled
+ */
 public class GlobalMetricImpl implements GlobalMetric {
 
     private AtomicLong numberOfSamples = new AtomicLong(0);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java
new file mode 100644
index 0000000..381a757
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.metrics.Gauge;
+import org.apache.hadoop.hbase.metrics.Histogram;
+import org.apache.hadoop.hbase.metrics.Meter;
+import org.apache.hadoop.hbase.metrics.Metric;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
+import org.apache.hadoop.hbase.metrics.MetricRegistryInfo;
+import org.apache.hadoop.hbase.metrics.Timer;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+/**
+ * Contents mostly copied from GlobalMetricRegistriesAdapter class from 
hbase-hadoop2-compat
+ * The adapter attaches HBase's MetricRegistry to Hadoop's DefaultMetricsSystem
+ * Note: This DOES NOT handle dynamic attach/detach of registries
+ */
+public class GlobalMetricRegistriesAdapter {
+
+    private static final Log LOG = 
LogFactory.getLog(GlobalMetricRegistriesAdapter.class);
+    private static GlobalMetricRegistriesAdapter INSTANCE = new 
GlobalMetricRegistriesAdapter();
+
+    private GlobalMetricRegistriesAdapter() {
+        DefaultMetricsSystem.initialize("Phoenix");
+        JvmMetrics.initSingleton("Phoenix", "");
+    }
+
+    public static GlobalMetricRegistriesAdapter getInstance() {
+        return INSTANCE;
+    }
+
+    public void registerMetricRegistry(MetricRegistry registry) {
+        if (registry == null) {
+            LOG.warn("Registry cannot be registered with Hadoop Metrics 2 
since it is null.");
+            return;
+        }
+
+        HBaseMetrics2HadoopMetricsAdapter adapter = new 
HBaseMetrics2HadoopMetricsAdapter(registry);
+        adapter.registerToDefaultMetricsSystem();
+    }
+
+    /**
+     * Class to convert HBase Metric Objects to Hadoop Metrics2 Metric Objects
+     */
+    private static class HBaseMetrics2HadoopMetricsAdapter implements 
MetricsSource {
+        private static final Log LOG = 
LogFactory.getLog(HBaseMetrics2HadoopMetricsAdapter.class);
+        private final MetricRegistry registry;
+        private final String metricTag;
+
+        private HBaseMetrics2HadoopMetricsAdapter(MetricRegistry registry) {
+            this.registry = registry;
+            metricTag = 
QueryServicesOptions.withDefaults().getClientMetricTag();
+        }
+
+        private void registerToDefaultMetricsSystem() {
+            MetricRegistryInfo info = registry.getMetricRegistryInfo();
+            LOG.info("Registering " + info.getMetricsJmxContext() + " " + 
info.getMetricsDescription() + " into DefaultMetricsSystem");
+            
DefaultMetricsSystem.instance().register(info.getMetricsJmxContext(), 
info.getMetricsDescription(), this);
+        }
+
+        private void snapshotAllMetrics(MetricRegistry metricRegistry, 
MetricsCollector collector) {
+            MetricRegistryInfo hbaseMetricRegistryInfo = 
metricRegistry.getMetricRegistryInfo();
+            MetricsInfo hadoopMetricsInfo = 
Interns.info(hbaseMetricRegistryInfo.getMetricsName(), 
hbaseMetricRegistryInfo.getMetricsDescription());
+            MetricsRecordBuilder builder = 
collector.addRecord(hadoopMetricsInfo);
+            builder.setContext(hbaseMetricRegistryInfo.getMetricsContext());
+            builder.tag(hadoopMetricsInfo, metricTag);
+            this.snapshotAllMetrics(metricRegistry, builder);
+        }
+
+        private void snapshotAllMetrics(MetricRegistry metricRegistry, 
MetricsRecordBuilder builder) {
+            Map<String, Metric> metrics = metricRegistry.getMetrics();
+            Iterator iterator = metrics.entrySet().iterator();
+
+            while(iterator.hasNext()) {
+                Entry<String, Metric> e = (Entry)iterator.next();
+                String name = StringUtils.capitalize(e.getKey());
+                Metric metric = e.getValue();
+                if (metric instanceof Gauge) {
+                    this.addGauge(name, (Gauge)metric, builder);
+                } else if (metric instanceof Counter) {
+                    this.addCounter(name, (Counter)metric, builder);
+                } else if (metric instanceof Histogram) {
+                    this.addHistogram(name, (Histogram)metric, builder);
+                } else if (metric instanceof Meter) {
+                    this.addMeter(name, (Meter)metric, builder);
+                } else if (metric instanceof Timer) {
+                    this.addTimer(name, (Timer)metric, builder);
+                } else {
+                    LOG.info("Ignoring unknown Metric class " + 
metric.getClass().getName());
+                }
+            }
+        }
+
+        private void addGauge(String name, Gauge<?> gauge, 
MetricsRecordBuilder builder) {
+            MetricsInfo info = Interns.info(name, "");
+            Object o = gauge.getValue();
+            if (o instanceof Integer) {
+                builder.addGauge(info, (Integer)o);
+            } else if (o instanceof Long) {
+                builder.addGauge(info, (Long)o);
+            } else if (o instanceof Float) {
+                builder.addGauge(info, (Float)o);
+            } else if (o instanceof Double) {
+                builder.addGauge(info, (Double)o);
+            } else {
+                LOG.warn("Ignoring Gauge (" + name + ") with unhandled type: " 
+ o.getClass());
+            }
+
+        }
+
+        private void addCounter(String name, Counter counter, 
MetricsRecordBuilder builder) {
+            MetricsInfo info = Interns.info(name, "");
+            builder.addCounter(info, counter.getCount());
+        }
+
+        private void addHistogram(String name, Histogram histogram, 
MetricsRecordBuilder builder) {
+            MutableHistogram.snapshot(name, "", histogram, builder, true);
+        }
+
+        private void addMeter(String name, Meter meter, MetricsRecordBuilder 
builder) {
+            builder.addGauge(Interns.info(name + "_count", ""), 
meter.getCount());
+            builder.addGauge(Interns.info(name + "_mean_rate", ""), 
meter.getMeanRate());
+            builder.addGauge(Interns.info(name + "_1min_rate", ""), 
meter.getOneMinuteRate());
+            builder.addGauge(Interns.info(name + "_5min_rate", ""), 
meter.getFiveMinuteRate());
+            builder.addGauge(Interns.info(name + "_15min_rate", ""), 
meter.getFifteenMinuteRate());
+        }
+
+        private void addTimer(String name, Timer timer, MetricsRecordBuilder 
builder) {
+            this.addMeter(name, timer.getMeter(), builder);
+            this.addHistogram(name, timer.getHistogram(), builder);
+        }
+
+        @Override
+        public void getMetrics(MetricsCollector metricsCollector, boolean b) {
+            this.snapshotAllMetrics(this.registry, metricsCollector);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
new file mode 100644
index 0000000..87216a4
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+/**
+ * Implementation used when GlobalMetrics are disabled
+ */
+public class NoOpGlobalMetricImpl implements GlobalMetric {
+
+    static long NO_SAMPLES = -1;
+    static long NO_VALUE = -1;
+
+    @Override
+    public long getNumberOfSamples() {
+        return NO_SAMPLES;
+    }
+
+    @Override
+    public long getTotalSum() {
+        return NO_VALUE;
+    }
+
+    @Override
+    public MetricType getMetricType() {
+        return null;
+    }
+
+    @Override
+    public long getValue() {
+        return NO_VALUE;
+    }
+
+    @Override
+    public void change(long delta) {
+
+    }
+
+    @Override
+    public void increment() {
+
+    }
+
+    @Override
+    public void decrement() {
+
+    }
+
+    @Override
+    public String getCurrentMetricState() {
+        return null;
+    }
+
+    @Override
+    public void reset() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index d290174..a2e3a33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -210,6 +210,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = 
"phoenix.table.istransactional.default";
     public static final String DEFAULT_TRANSACTION_PROVIDER_ATTRIB = 
"phoenix.table.transaction.provider.default";
     public static final String GLOBAL_METRICS_ENABLED = 
"phoenix.query.global.metrics.enabled";
+
+    // Tag Name to determine the Phoenix Client Type
+    public static final String CLIENT_METRICS_TAG = 
"phoenix.client.metrics.tag";
     
     // Transaction related configs
     public static final String TRANSACTIONS_ENABLED = 
"phoenix.transactions.enabled";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 6968965..36ae599 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -23,6 +23,7 @@ import static 
org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE
 import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
 import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
 import static 
org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
@@ -117,7 +118,7 @@ import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
-
+import org.apache.tephra.TxConstants;
 
 
 /**
@@ -269,6 +270,7 @@ public class QueryServicesOptions {
     public static final String DEFAULT_TRANSACTION_PROVIDER = 
TransactionFactory.Provider.getDefault().name();
     public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false;
     public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true;
+    public static final String DEFAULT_CLIENT_METRICS_TAG = "FAT_CLIENT";
 
     public static final boolean DEFAULT_TRANSACTIONAL = false;
     public static final boolean DEFAULT_AUTO_FLUSH = false;
@@ -444,7 +446,10 @@ public class QueryServicesOptions {
             .setIfUnset(COST_BASED_OPTIMIZER_ENABLED, 
DEFAULT_COST_BASED_OPTIMIZER_ENABLED)
             .setIfUnset(PHOENIX_ACLS_ENABLED,  DEFAULT_PHOENIX_ACLS_ENABLED)
             .setIfUnset(LOG_LEVEL,  DEFAULT_LOGGING_LEVEL)
-            .setIfUnset(LOG_SAMPLE_RATE,  DEFAULT_LOG_SAMPLE_RATE);
+            .setIfUnset(LOG_SAMPLE_RATE,  DEFAULT_LOG_SAMPLE_RATE)
+            .setIfUnset(TxConstants.TX_PRE_014_CHANGESET_KEY, 
Boolean.FALSE.toString())
+            .setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG)
+            ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user 
set
         // it to 1, so we'll change it.
@@ -608,6 +613,10 @@ public class QueryServicesOptions {
         return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, 
DEFAULT_MUTATE_BATCH_SIZE);
     }
 
+    public String getClientMetricTag() {
+        return config.get(QueryServices.CLIENT_METRICS_TAG, 
DEFAULT_CLIENT_METRICS_TAG);
+    }
+
     public boolean isUseIndexes() {
         return config.getBoolean(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bf24e382/phoenix-core/src/test/resources/hadoop-metrics2.properties
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/resources/hadoop-metrics2.properties 
b/phoenix-core/src/test/resources/hadoop-metrics2.properties
index 3f227f6..27cadc9 100644
--- a/phoenix-core/src/test/resources/hadoop-metrics2.properties
+++ b/phoenix-core/src/test/resources/hadoop-metrics2.properties
@@ -32,10 +32,9 @@
 #    [prefix].[source|sink].[instance].[options]
 # See javadoc of package-info.java for org.apache.hadoop.metrics2 for detail
 
-
-# Don't attempt to start jmx mbeans for all sources.
-#  For right now, all metrics are exported to a phoenix table
-*.source.start_mbeans=false
+phoenix.source.start_mbeans=true
+phoenix.sink.sink0.class=org.apache.phoenix.monitoring.GlobalPhoenixMetricsTestSink
 
 # Frequency, in seconds, of sampling from the sources
-*.period=10
+# High Frequency for test purposes
+*.period=1

Reply via email to