Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.4 da2796ab9 -> 1c82e031d
PHOENIX-3655 Global Phoenix Client Metrics for PQS Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1c82e031 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1c82e031 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1c82e031 Branch: refs/heads/4.x-HBase-1.4 Commit: 1c82e031daa6e25129473ddc097ccd263c3ba48a Parents: da2796a Author: Karan Mehta <[email protected]> Authored: Fri Jul 27 01:57:33 2018 -0700 Committer: Karan Mehta <[email protected]> Committed: Thu Aug 2 15:36:16 2018 -0700 ---------------------------------------------------------------------- .../GlobalPhoenixMetricsTestSink.java | 60 +++++++ .../monitoring/PhoenixMetricsDisabledIT.java | 80 +++++++++ .../phoenix/monitoring/PhoenixMetricsIT.java | 75 +++++++- .../phoenix/monitoring/GlobalClientMetrics.java | 74 ++++++-- .../phoenix/monitoring/GlobalMetricImpl.java | 3 + .../GlobalMetricRegistriesAdapter.java | 170 +++++++++++++++++++ .../monitoring/NoOpGlobalMetricImpl.java | 72 ++++++++ .../org/apache/phoenix/query/QueryServices.java | 3 + .../phoenix/query/QueryServicesOptions.java | 7 + .../test/resources/hadoop-metrics2.properties | 9 +- 10 files changed, 537 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java new file mode 100644 index 0000000..c4e677a --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/GlobalPhoenixMetricsTestSink.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.phoenix.util.PhoenixRuntime; + +import java.util.Map; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class GlobalPhoenixMetricsTestSink implements MetricsSink { + + public static final String PHOENIX_METRICS_RECORD_NAME = "PHOENIX"; + // PhoenixMetricsIT tests verifies these metrics from this sink in a separate thread + // GlobalPhoenixMetricsTestSink is invoked based on time defined in hadoop-metrics2.properties + // This lock is to prevent concurrent access to metrics Iterable for these threads + static Object lock = new Object(); + static Iterable<AbstractMetric> metrics; + + @Override + public void putMetrics(MetricsRecord metricsRecord) { + if (metricsRecord.name().equals(PHOENIX_METRICS_RECORD_NAME)) { + synchronized (GlobalPhoenixMetricsTestSink.lock) { + GlobalPhoenixMetricsTestSink.metrics = metricsRecord.metrics(); + } + } + } + + @Override + public void flush() { + } + + @Override + public void init(SubsetConfiguration subsetConfiguration) { + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java new file mode 100644 index 0000000..85cf1a3 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsDisabledIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.ConfigurationFactory; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.InstanceResolver; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.hamcrest.CoreMatchers; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.DriverManager; +import java.util.Map; + +import static org.apache.phoenix.monitoring.NoOpGlobalMetricImpl.NO_SAMPLES; +import static org.apache.phoenix.monitoring.NoOpGlobalMetricImpl.NO_VALUE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +public class PhoenixMetricsDisabledIT extends BaseUniqueNamesOwnClusterIT { + + @BeforeClass + public static void doSetup() throws Exception { + final Configuration conf = HBaseConfiguration.create(); + conf.set(QueryServices.GLOBAL_METRICS_ENABLED, String.valueOf(false)); + conf.set(QueryServices.RENEW_LEASE_ENABLED, String.valueOf(false)); + // Clear the cached singletons so we can inject our own. + InstanceResolver.clearSingletons(); + // Make sure the ConnectionInfo doesn't try to pull a default Configuration + InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() { + @Override + public Configuration getConfiguration() { + return conf; + } + @Override + public Configuration getConfiguration(Configuration confToClone) { + Configuration copy = new Configuration(conf); + copy.addResource(confToClone); + return copy; + } + }); + + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @Test + public void testResetGlobalPhoenixMetrics() { + for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) { + assertThat(m, CoreMatchers.<GlobalMetric>instanceOf(NoOpGlobalMetricImpl.class)); + assertEquals(NO_VALUE, m.getTotalSum()); + assertEquals(NO_SAMPLES, m.getNumberOfSamples()); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java index f13391f..e0ddc7e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java @@ -40,6 +40,7 @@ import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -50,6 +51,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -61,6 +63,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.exception.SQLExceptionCode; @@ -73,6 +78,7 @@ import org.apache.phoenix.log.LogLevel; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.ReadOnlyProps; +import org.hamcrest.CoreMatchers; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; @@ -83,8 +89,18 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +/** + * Tests that + * 1. Phoenix Global metrics are exposed via + * a. PhoenixRuntime b. Hadoop-Metrics2 defined sinks + * 2. Phoenix Request level metrics are exposed via + * a. PhoenixRuntime + */ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { + private static final Log LOG = LogFactory.getLog(PhoenixMetricsIT.class); + private static final int MAX_RETRIES = 5; + private static final List<MetricType> mutationMetricsToSkip = Lists.newArrayList(MetricType.MUTATION_COMMIT_TIME); private static final List<MetricType> readMetricsToSkip = @@ -97,6 +113,7 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { @BeforeClass public static void doSetup() throws Exception { Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // Phoenix Global client metrics are enabled by default // Enable request metric collection at the driver level props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); // disable renewing leases as this will force spooling to happen. @@ -104,15 +121,18 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); // need the non-test driver for some tests that check number of hconnections, etc. DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } @Test - public void testResetGlobalPhoenixMetrics() { + public void testResetGlobalPhoenixMetrics() throws Exception { resetGlobalMetrics(); for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) { + assertThat(m, CoreMatchers.<GlobalMetric>instanceOf(GlobalMetricImpl.class)); assertEquals(0, m.getTotalSum()); assertEquals(0, m.getNumberOfSamples()); } + assertTrue(verifyMetricsFromSink()); } @Test @@ -146,6 +166,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { assertTrue(GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS.getMetric().getTotalSum() > 0); assertTrue(GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS.getMetric().getTotalSum() > 0); assertTrue(GLOBAL_HBASE_COUNT_SCANNED_REGIONS.getMetric().getTotalSum() > 0); + + assertTrue(verifyMetricsFromSink()); } @Test @@ -163,6 +185,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { assertEquals(0, GLOBAL_FAILED_QUERY_COUNTER.getMetric().getTotalSum()); assertEquals(0, GLOBAL_SPOOL_FILE_COUNTER.getMetric().getTotalSum()); assertEquals(0, GLOBAL_MUTATION_BATCH_FAILED_COUNT.getMetric().getTotalSum()); + + assertTrue(verifyMetricsFromSink()); } @Test @@ -196,6 +220,8 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { assertTrue(GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS.getMetric().getTotalSum() > 0); assertTrue(GLOBAL_HBASE_COUNT_BYTES_REGION_SERVER_RESULTS.getMetric().getTotalSum() > 0); assertTrue(GLOBAL_HBASE_COUNT_SCANNED_REGIONS.getMetric().getTotalSum() > 0); + + assertTrue(verifyMetricsFromSink()); } private static void resetGlobalMetrics() { @@ -204,6 +230,53 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT { } } + // Phoenix Client Metrics are transported via Hadoop-metrics2 sink + // The test sink is defined at GlobalPhoenixMetricsTestSink + // Configuration for Hadoop-metrics2 comes from hadoop-metrics2.properties file located in test/resources + private boolean verifyMetricsFromSink() { + Map<String, Long> expectedMetrics = new HashMap<>(); + for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) { + expectedMetrics.put(m.getMetricType().name(), m.getTotalSum()); + } + + for (int i = 0; i < MAX_RETRIES; i++) { + LOG.info("Verifying Global Metrics from Hadoop Sink, Retry: " + (i + 1)); + if (verifyMetricsFromSinkOnce(expectedMetrics)) { + LOG.info("Values from Hadoop Metrics Sink match actual values"); + return true; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + return false; + } + + private boolean verifyMetricsFromSinkOnce(Map<String, Long> expectedMetrics) { + synchronized (GlobalPhoenixMetricsTestSink.lock) { + for (AbstractMetric metric : GlobalPhoenixMetricsTestSink.metrics) { + if (expectedMetrics.containsKey(metric.name())) { + Long value = expectedMetrics.get(metric.name()); + if (value != null) { + long expectedValue = value; + long actualValue = metric.value().longValue(); + if (expectedValue != actualValue) { + LOG.warn("Metric from Hadoop Sink: " + metric.name() + " didn't match expected."); + return false; + } + expectedMetrics.remove(metric.name()); + } + } + } + } + assertTrue("Metric expected but not present in Hadoop Metrics Sink (GlobalPhoenixMetricsTestSink)", + expectedMetrics.size() == 0); + return true; + } + private static void createTableAndInsertValues(String tableName, boolean resetGlobalMetricsAfterTableCreate) throws Exception { String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index 9c95895..e7c7bae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -59,9 +59,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.hadoop.hbase.metrics.Gauge; +import org.apache.hadoop.hbase.metrics.MetricRegistries; +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.phoenix.query.QueryServicesOptions; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Central place where we keep track of all the global client phoenix metrics. These metrics are different from @@ -108,40 +114,88 @@ public enum GlobalClientMetrics { GLOBAL_HBASE_COUNT_ROWS_SCANNED(COUNT_ROWS_SCANNED), GLOBAL_HBASE_COUNT_ROWS_FILTERED(COUNT_ROWS_FILTERED); - + private static final Logger LOG = LoggerFactory.getLogger(GlobalClientMetrics.class); private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled(); + private MetricType metricType; private GlobalMetric metric; - public void update(long value) { + static { + initPhoenixGlobalClientMetrics(); if (isGlobalMetricsEnabled) { - metric.change(value); + MetricRegistry metricRegistry = createMetricRegistry(); + registerPhoenixMetricsToRegistry(metricRegistry); + GlobalMetricRegistriesAdapter.getInstance().registerMetricRegistry(metricRegistry); + } + } + + private static void initPhoenixGlobalClientMetrics() { + for (GlobalClientMetrics globalMetric : GlobalClientMetrics.values()) { + globalMetric.metric = isGlobalMetricsEnabled ? + new GlobalMetricImpl(globalMetric.metricType) : new NoOpGlobalMetricImpl(); + } + } + + private static void registerPhoenixMetricsToRegistry(MetricRegistry metricRegistry) { + for (GlobalClientMetrics globalMetric : GlobalClientMetrics.values()) { + metricRegistry.register(globalMetric.metricType.columnName(), + new PhoenixGlobalMetricGauge(globalMetric.metric)); + } + } + + private static MetricRegistry createMetricRegistry() { + LOG.info("Creating Metric Registry for Phoenix Global Metrics"); + MetricRegistryInfo registryInfo = new MetricRegistryInfo("PHOENIX", "Phoenix Client Metrics", + "phoenix", "Phoenix,sub=CLIENT", true); + return MetricRegistries.global().create(registryInfo); + } + + /** + * Class to convert Phoenix Metric objects into HBase Metric objects (Gauge) + */ + private static class PhoenixGlobalMetricGauge implements Gauge<Long> { + + private final GlobalMetric metric; + + public PhoenixGlobalMetricGauge(GlobalMetric metric) { + this.metric = metric; + } + + @Override + public Long getValue() { + return metric.getValue(); } } + public void update(long value) { + metric.change(value); + } + @VisibleForTesting public GlobalMetric getMetric() { return metric; } + @VisibleForTesting + public MetricType getMetricType() { + return metricType; + } + + @Override public String toString() { return metric.toString(); } private GlobalClientMetrics(MetricType metricType) { - this.metric = new GlobalMetricImpl(metricType); + this.metricType = metricType; } public void increment() { - if (isGlobalMetricsEnabled) { - metric.increment(); - } + metric.increment(); } public void decrement() { - if (isGlobalMetricsEnabled) { - metric.decrement(); - } + metric.decrement(); } public static Collection<GlobalMetric> getMetrics() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java index ce692f2..ba2d925 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricImpl.java @@ -11,6 +11,9 @@ package org.apache.phoenix.monitoring; import java.util.concurrent.atomic.AtomicLong; +/** + * Default implementation used when GlobalMetrics are enabled + */ public class GlobalMetricImpl implements GlobalMetric { private AtomicLong numberOfSamples = new AtomicLong(0); http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java new file mode 100644 index 0000000..381a757 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalMetricRegistriesAdapter.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.metrics.Counter; +import org.apache.hadoop.hbase.metrics.Gauge; +import org.apache.hadoop.hbase.metrics.Histogram; +import org.apache.hadoop.hbase.metrics.Meter; +import org.apache.hadoop.hbase.metrics.Metric; +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; +import org.apache.hadoop.hbase.metrics.Timer; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MutableHistogram; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.phoenix.query.QueryServicesOptions; + +/** + * Contents mostly copied from GlobalMetricRegistriesAdapter class from hbase-hadoop2-compat + * The adapter attaches HBase's MetricRegistry to Hadoop's DefaultMetricsSystem + * Note: This DOES NOT handle dynamic attach/detach of registries + */ +public class GlobalMetricRegistriesAdapter { + + private static final Log LOG = LogFactory.getLog(GlobalMetricRegistriesAdapter.class); + private static GlobalMetricRegistriesAdapter INSTANCE = new GlobalMetricRegistriesAdapter(); + + private GlobalMetricRegistriesAdapter() { + DefaultMetricsSystem.initialize("Phoenix"); + JvmMetrics.initSingleton("Phoenix", ""); + } + + public static GlobalMetricRegistriesAdapter getInstance() { + return INSTANCE; + } + + public void registerMetricRegistry(MetricRegistry registry) { + if (registry == null) { + LOG.warn("Registry cannot be registered with Hadoop Metrics 2 since it is null."); + return; + } + + HBaseMetrics2HadoopMetricsAdapter adapter = new HBaseMetrics2HadoopMetricsAdapter(registry); + adapter.registerToDefaultMetricsSystem(); + } + + /** + * Class to convert HBase Metric Objects to Hadoop Metrics2 Metric Objects + */ + private static class HBaseMetrics2HadoopMetricsAdapter implements MetricsSource { + private static final Log LOG = LogFactory.getLog(HBaseMetrics2HadoopMetricsAdapter.class); + private final MetricRegistry registry; + private final String metricTag; + + private HBaseMetrics2HadoopMetricsAdapter(MetricRegistry registry) { + this.registry = registry; + metricTag = QueryServicesOptions.withDefaults().getClientMetricTag(); + } + + private void registerToDefaultMetricsSystem() { + MetricRegistryInfo info = registry.getMetricRegistryInfo(); + LOG.info("Registering " + info.getMetricsJmxContext() + " " + info.getMetricsDescription() + " into DefaultMetricsSystem"); + DefaultMetricsSystem.instance().register(info.getMetricsJmxContext(), info.getMetricsDescription(), this); + } + + private void snapshotAllMetrics(MetricRegistry metricRegistry, MetricsCollector collector) { + MetricRegistryInfo hbaseMetricRegistryInfo = metricRegistry.getMetricRegistryInfo(); + MetricsInfo hadoopMetricsInfo = Interns.info(hbaseMetricRegistryInfo.getMetricsName(), hbaseMetricRegistryInfo.getMetricsDescription()); + MetricsRecordBuilder builder = collector.addRecord(hadoopMetricsInfo); + builder.setContext(hbaseMetricRegistryInfo.getMetricsContext()); + builder.tag(hadoopMetricsInfo, metricTag); + this.snapshotAllMetrics(metricRegistry, builder); + } + + private void snapshotAllMetrics(MetricRegistry metricRegistry, MetricsRecordBuilder builder) { + Map<String, Metric> metrics = metricRegistry.getMetrics(); + Iterator iterator = metrics.entrySet().iterator(); + + while(iterator.hasNext()) { + Entry<String, Metric> e = (Entry)iterator.next(); + String name = StringUtils.capitalize(e.getKey()); + Metric metric = e.getValue(); + if (metric instanceof Gauge) { + this.addGauge(name, (Gauge)metric, builder); + } else if (metric instanceof Counter) { + this.addCounter(name, (Counter)metric, builder); + } else if (metric instanceof Histogram) { + this.addHistogram(name, (Histogram)metric, builder); + } else if (metric instanceof Meter) { + this.addMeter(name, (Meter)metric, builder); + } else if (metric instanceof Timer) { + this.addTimer(name, (Timer)metric, builder); + } else { + LOG.info("Ignoring unknown Metric class " + metric.getClass().getName()); + } + } + } + + private void addGauge(String name, Gauge<?> gauge, MetricsRecordBuilder builder) { + MetricsInfo info = Interns.info(name, ""); + Object o = gauge.getValue(); + if (o instanceof Integer) { + builder.addGauge(info, (Integer)o); + } else if (o instanceof Long) { + builder.addGauge(info, (Long)o); + } else if (o instanceof Float) { + builder.addGauge(info, (Float)o); + } else if (o instanceof Double) { + builder.addGauge(info, (Double)o); + } else { + LOG.warn("Ignoring Gauge (" + name + ") with unhandled type: " + o.getClass()); + } + + } + + private void addCounter(String name, Counter counter, MetricsRecordBuilder builder) { + MetricsInfo info = Interns.info(name, ""); + builder.addCounter(info, counter.getCount()); + } + + private void addHistogram(String name, Histogram histogram, MetricsRecordBuilder builder) { + MutableHistogram.snapshot(name, "", histogram, builder, true); + } + + private void addMeter(String name, Meter meter, MetricsRecordBuilder builder) { + builder.addGauge(Interns.info(name + "_count", ""), meter.getCount()); + builder.addGauge(Interns.info(name + "_mean_rate", ""), meter.getMeanRate()); + builder.addGauge(Interns.info(name + "_1min_rate", ""), meter.getOneMinuteRate()); + builder.addGauge(Interns.info(name + "_5min_rate", ""), meter.getFiveMinuteRate()); + builder.addGauge(Interns.info(name + "_15min_rate", ""), meter.getFifteenMinuteRate()); + } + + private void addTimer(String name, Timer timer, MetricsRecordBuilder builder) { + this.addMeter(name, timer.getMeter(), builder); + this.addHistogram(name, timer.getHistogram(), builder); + } + + @Override + public void getMetrics(MetricsCollector metricsCollector, boolean b) { + this.snapshotAllMetrics(this.registry, metricsCollector); + } + + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java new file mode 100644 index 0000000..87216a4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/NoOpGlobalMetricImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.monitoring; + +/** + * Implementation used when GlobalMetrics are disabled + */ +public class NoOpGlobalMetricImpl implements GlobalMetric { + + static long NO_SAMPLES = -1; + static long NO_VALUE = -1; + + @Override + public long getNumberOfSamples() { + return NO_SAMPLES; + } + + @Override + public long getTotalSum() { + return NO_VALUE; + } + + @Override + public MetricType getMetricType() { + return null; + } + + @Override + public long getValue() { + return NO_VALUE; + } + + @Override + public void change(long delta) { + + } + + @Override + public void increment() { + + } + + @Override + public void decrement() { + + } + + @Override + public String getCurrentMetricState() { + return null; + } + + @Override + public void reset() { + + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 2bb9350..d00f0d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -210,6 +210,9 @@ public interface QueryServices extends SQLCloseable { public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = "phoenix.table.istransactional.default"; public static final String DEFAULT_TRANSACTION_PROVIDER_ATTRIB = "phoenix.table.transaction.provider.default"; public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled"; + + // Tag Name to determine the Phoenix Client Type + public static final String CLIENT_METRICS_TAG = "phoenix.client.metrics.tag"; // Transaction related configs public static final String TRANSACTIONS_ENABLED = "phoenix.transactions.enabled"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 4be8f81..8e53a2a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -23,6 +23,7 @@ import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; +import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED; @@ -270,6 +271,7 @@ public class QueryServicesOptions { public static final String DEFAULT_TRANSACTION_PROVIDER = TransactionFactory.Provider.getDefault().name(); public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false; public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true; + public static final String DEFAULT_CLIENT_METRICS_TAG = "FAT_CLIENT"; public static final boolean DEFAULT_TRANSACTIONAL = false; public static final boolean DEFAULT_AUTO_FLUSH = false; @@ -445,6 +447,7 @@ public class QueryServicesOptions { .setIfUnset(LOG_LEVEL, DEFAULT_LOGGING_LEVEL) .setIfUnset(LOG_SAMPLE_RATE, DEFAULT_LOG_SAMPLE_RATE) .setIfUnset(TxConstants.TX_PRE_014_CHANGESET_KEY, Boolean.FALSE.toString()) + .setIfUnset(CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG) ; // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set @@ -609,6 +612,10 @@ public class QueryServicesOptions { return config.getInt(MUTATE_BATCH_SIZE_ATTRIB, DEFAULT_MUTATE_BATCH_SIZE); } + public String getClientMetricTag() { + return config.get(QueryServices.CLIENT_METRICS_TAG, DEFAULT_CLIENT_METRICS_TAG); + } + public boolean isUseIndexes() { return config.getBoolean(USE_INDEXES_ATTRIB, DEFAULT_USE_INDEXES); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c82e031/phoenix-core/src/test/resources/hadoop-metrics2.properties ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/resources/hadoop-metrics2.properties b/phoenix-core/src/test/resources/hadoop-metrics2.properties index 3f227f6..27cadc9 100644 --- a/phoenix-core/src/test/resources/hadoop-metrics2.properties +++ b/phoenix-core/src/test/resources/hadoop-metrics2.properties @@ -32,10 +32,9 @@ # [prefix].[source|sink].[instance].[options] # See javadoc of package-info.java for org.apache.hadoop.metrics2 for detail - -# Don't attempt to start jmx mbeans for all sources. -# For right now, all metrics are exported to a phoenix table -*.source.start_mbeans=false +phoenix.source.start_mbeans=true +phoenix.sink.sink0.class=org.apache.phoenix.monitoring.GlobalPhoenixMetricsTestSink # Frequency, in seconds, of sampling from the sources -*.period=10 +# High Frequency for test purposes +*.period=1
