HBASE-18060 Backport to branch-1 HBASE-9774 HBase native metrics and metric collection for coprocessors
Signed-off-by: Andrew Purtell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a3c3f101 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a3c3f101 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a3c3f101 Branch: refs/heads/branch-1 Commit: a3c3f1012d0eae531c8f38ac5c7e7b81402d8a84 Parents: f2ba52a Author: Vincent <[email protected]> Authored: Thu May 18 15:39:29 2017 -0700 Committer: Andrew Purtell <[email protected]> Committed: Wed May 24 13:20:44 2017 -0700 ---------------------------------------------------------------------- hbase-assembly/pom.xml | 10 + .../hadoop/hbase/util/FastLongHistogram.java | 318 ----------- .../org/apache/hadoop/hbase/util/LongAdder.java | 224 ++++++++ .../org/apache/hadoop/hbase/util/Striped64.java | 356 ++++++++++++ .../hbase/util/TestFastLongHistogram.java | 132 ----- .../ExampleMasterObserverWithMetrics.java | 148 +++++ .../ExampleRegionObserverWithMetrics.java | 121 ++++ hbase-hadoop-compat/pom.xml | 4 + .../apache/hadoop/hbase/metrics/BaseSource.java | 2 + hbase-hadoop2-compat/pom.xml | 4 + .../hadoop/hbase/metrics/BaseSourceImpl.java | 52 ++ .../impl/GlobalMetricRegistriesAdapter.java | 233 ++++++++ .../impl/HBaseMetrics2HadoopMetricsAdapter.java | 169 ++++++ .../MetricsRegionServerSourceImpl.java | 6 + .../MetricsReplicationGlobalSourceSource.java | 7 + .../MetricsReplicationSourceSourceImpl.java | 7 + .../lib/DefaultMetricsSystemHelper.java | 50 +- .../hadoop/metrics2/lib/MutableHistogram.java | 111 ++-- .../metrics2/lib/MutableRangeHistogram.java | 14 +- .../impl/TestGlobalMetricRegistriesAdapter.java | 86 +++ hbase-metrics-api/README.txt | 78 +++ hbase-metrics-api/pom.xml | 112 ++++ .../apache/hadoop/hbase/metrics/Counter.java | 60 ++ .../org/apache/hadoop/hbase/metrics/Gauge.java | 35 ++ .../apache/hadoop/hbase/metrics/Histogram.java | 58 ++ .../org/apache/hadoop/hbase/metrics/Meter.java | 90 +++ .../org/apache/hadoop/hbase/metrics/Metric.java | 30 + .../hadoop/hbase/metrics/MetricRegistries.java | 90 +++ .../hbase/metrics/MetricRegistriesLoader.java | 96 ++++ .../hadoop/hbase/metrics/MetricRegistry.java | 112 ++++ .../hbase/metrics/MetricRegistryFactory.java | 36 ++ .../hbase/metrics/MetricRegistryInfo.java | 112 ++++ .../apache/hadoop/hbase/metrics/MetricSet.java | 41 ++ .../hadoop/hbase/metrics/PackageMarker.java | 38 ++ .../apache/hadoop/hbase/metrics/Snapshot.java | 136 +++++ .../org/apache/hadoop/hbase/metrics/Timer.java | 62 +++ .../hadoop/hbase/metrics/package-info.java | 25 + .../metrics/TestMetricRegistriesLoader.java | 56 ++ hbase-metrics/README.txt | 1 + hbase-metrics/pom.xml | 136 +++++ .../hadoop/hbase/metrics/impl/CounterImpl.java | 60 ++ .../hbase/metrics/impl/DropwizardMeter.java | 74 +++ .../hbase/metrics/impl/FastLongHistogram.java | 406 ++++++++++++++ .../hbase/metrics/impl/HistogramImpl.java | 81 +++ .../metrics/impl/MetricRegistriesImpl.java | 82 +++ .../metrics/impl/MetricRegistryFactoryImpl.java | 34 ++ .../hbase/metrics/impl/MetricRegistryImpl.java | 183 +++++++ .../hbase/metrics/impl/RefCountingMap.java | 108 ++++ .../hadoop/hbase/metrics/impl/TimerImpl.java | 74 +++ .../hadoop/hbase/metrics/impl/package-info.java | 25 + ...apache.hadoop.hbase.metrics.MetricRegistries | 18 + .../hbase/metrics/impl/TestCounterImpl.java | 59 ++ .../hbase/metrics/impl/TestDropwizardMeter.java | 51 ++ .../metrics/impl/TestFastLongHistogram.java | 132 +++++ .../hadoop/hbase/metrics/impl/TestGauge.java | 61 +++ .../hbase/metrics/impl/TestHistogramImpl.java | 103 ++++ .../metrics/impl/TestMetricRegistryImpl.java | 164 ++++++ .../hbase/metrics/impl/TestRefCountingMap.java | 284 ++++++++++ .../hbase/metrics/impl/TestTimerImpl.java | 53 ++ .../src/main/resources/supplemental-models.xml | 14 + hbase-server/pom.xml | 8 + .../MasterCoprocessorEnvironment.java | 10 + .../hbase/coprocessor/MetricsCoprocessor.java | 136 +++++ .../RegionCoprocessorEnvironment.java | 24 +- .../RegionServerCoprocessorEnvironment.java | 10 + .../coprocessor/WALCoprocessorEnvironment.java | 10 + .../hadoop/hbase/io/hfile/AgeSnapshot.java | 2 +- .../hadoop/hbase/io/hfile/BlockCacheUtil.java | 2 +- .../hadoop/hbase/io/hfile/CacheStats.java | 2 +- .../hbase/master/MasterCoprocessorHost.java | 25 +- .../hbase/regionserver/MetricsRegionServer.java | 21 +- .../hbase/regionserver/RSRpcServices.java | 8 + .../regionserver/RegionCoprocessorHost.java | 20 +- .../RegionServerCoprocessorHost.java | 29 +- .../regionserver/wal/WALCoprocessorHost.java | 19 + .../replication/regionserver/MetricsSource.java | 7 + .../apache/hadoop/hbase/MiniHBaseCluster.java | 10 + .../coprocessor/TestCoprocessorMetrics.java | 549 +++++++++++++++++++ .../security/token/TestTokenAuthentication.java | 6 + pom.xml | 31 ++ 80 files changed, 5904 insertions(+), 549 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml index 2c78516..a8ac79d 100644 --- a/hbase-assembly/pom.xml +++ b/hbase-assembly/pom.xml @@ -205,6 +205,16 @@ </dependency> <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-metrics-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-metrics</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-resource-bundle</artifactId> <version>${project.version}</version> <optional>true</optional> http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FastLongHistogram.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FastLongHistogram.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FastLongHistogram.java deleted file mode 100644 index 3c4eccc..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FastLongHistogram.java +++ /dev/null @@ -1,318 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * FastLongHistogram is a thread-safe class that estimate distribution of data and computes the - * quantiles. - */ [email protected] [email protected] -public class FastLongHistogram { - - /** - * Default number of bins. - */ - public static final int DEFAULT_NBINS = 255; - - public static final double[] DEFAULT_QUANTILES = - new double[]{0.25, 0.5, 0.75, 0.90, 0.95, 0.98, 0.99, 0.999}; - - /** - * Bins is a class containing a list of buckets(or bins) for estimation histogram of some data. - */ - private static class Bins { - private final Counter[] counts; - // inclusive - private final long binsMin; - // exclusive - private final long binsMax; - private final long bins10XMax; - private final AtomicLong min = new AtomicLong(Long.MAX_VALUE); - private final AtomicLong max = new AtomicLong(0L); - - private final Counter count = new Counter(0); - private final Counter total = new Counter(0); - - // set to true when any of data has been inserted to the Bins. It is set after the counts are - // updated. - private final AtomicBoolean hasData = new AtomicBoolean(false); - - /** - * The constructor for creating a Bins without any prior data. - */ - public Bins(int numBins) { - counts = createCounters(numBins + 3); - this.binsMin = 1L; - - // These two numbers are total guesses - // and should be treated as highly suspect. - this.binsMax = 1000; - this.bins10XMax = binsMax * 10; - } - - /** - * The constructor for creating a Bins with last Bins. - */ - public Bins(Bins last, int numOfBins, double minQ, double maxQ) { - long[] values = last.getQuantiles(new double[] { minQ, maxQ }); - long wd = values[1] - values[0] + 1; - // expand minQ and maxQ in two ends back assuming uniform distribution - this.binsMin = Math.max(0L, (long) (values[0] - wd * minQ)); - long binsMax = (long) (values[1] + wd * (1 - maxQ)) + 1; - // make sure each of bins is at least of width 1 - this.binsMax = Math.max(binsMax, this.binsMin + numOfBins); - this.bins10XMax = Math.max((long) (values[1] + (binsMax - 1) * 9), this.binsMax + 1); - - this.counts = createCounters(numOfBins + 3); - } - - private Counter[] createCounters(int num) { - Counter[] counters = new Counter[num]; - for (int i = 0; i < num; i++) { - counters[i] = new Counter(); - } - return counters; - } - - private int getIndex(long value) { - if (value < this.binsMin) { - return 0; - } else if (value > this.bins10XMax) { - return this.counts.length - 1; - } else if (value >= this.binsMax) { - return this.counts.length - 2; - } - // compute the position - return 1 + (int) ((value - this.binsMin) * (this.counts.length - 3) / - (this.binsMax - this.binsMin)); - - } - - /** - * Adds a value to the histogram. - */ - public void add(long value, long count) { - if (value < 0) { - // The whole computation is completely thrown off if there are negative numbers - // - // Normally we would throw an IllegalArgumentException however this is the metrics - // system and it should be completely safe at all times. - // So silently throw it away. - return; - } - AtomicUtils.updateMin(min, value); - AtomicUtils.updateMax(max, value); - - this.count.add(count); - this.total.add(value * count); - - int pos = getIndex(value); - this.counts[pos].add(count); - - // hasData needs to be updated as last - this.hasData.set(true); - } - - /** - * Computes the quantiles give the ratios. - */ - public long[] getQuantiles(double[] quantiles) { - if (!this.hasData.get()) { - // No data yet. - return new long[quantiles.length]; - } - - // Make a snapshot of lowerCounter, higherCounter and bins.counts to counts. - // This is not synchronized, but since the counter are accumulating, the result is a good - // estimation of a snapshot. - long[] counts = new long[this.counts.length]; - long total = 0L; - for (int i = 0; i < this.counts.length; i++) { - counts[i] = this.counts[i].get(); - total += counts[i]; - } - - int rIndex = 0; - double qCount = total * quantiles[0]; - long cum = 0L; - - long[] res = new long[quantiles.length]; - countsLoop: for (int i = 0; i < counts.length; i++) { - // mn and mx define a value range - long mn, mx; - if (i == 0) { - mn = this.min.get(); - mx = this.binsMin; - } else if (i == counts.length - 1) { - mn = this.bins10XMax; - mx = this.max.get(); - } else if (i == counts.length - 2) { - mn = this.binsMax; - mx = this.bins10XMax; - } else { - mn = this.binsMin + (i - 1) * (this.binsMax - this.binsMin) / (this.counts.length - 3); - mx = this.binsMin + i * (this.binsMax - this.binsMin) / (this.counts.length - 3); - } - - if (mx < this.min.get()) { - continue; - } - if (mn > this.max.get()) { - break; - } - mn = Math.max(mn, this.min.get()); - mx = Math.min(mx, this.max.get()); - - // lastCum/cum are the corresponding counts to mn/mx - double lastCum = cum; - cum += counts[i]; - - // fill the results for qCount is within current range. - while (qCount <= cum) { - if (cum == lastCum) { - res[rIndex] = mn; - } else { - res[rIndex] = (long) ((qCount - lastCum) * (mx - mn) / (cum - lastCum) + mn); - } - - // move to next quantile - rIndex++; - if (rIndex >= quantiles.length) { - break countsLoop; - } - qCount = total * quantiles[rIndex]; - } - } - // In case quantiles contains values >= 100% - for (; rIndex < quantiles.length; rIndex++) { - res[rIndex] = this.max.get(); - } - - return res; - } - - - long getNumAtOrBelow(long val) { - final int targetIndex = getIndex(val); - long totalToCurrentIndex = 0; - for (int i = 0; i <= targetIndex; i++) { - totalToCurrentIndex += this.counts[i].get(); - } - return totalToCurrentIndex; - } - } - - // The bins counting values. It is replaced with a new one in calling of reset(). - private volatile Bins bins; - - /** - * Constructor. - */ - public FastLongHistogram() { - this(DEFAULT_NBINS); - } - - /** - * Constructor. - * @param numOfBins the number of bins for the histogram. A larger value results in more precise - * results but with lower efficiency, and vice versus. - */ - public FastLongHistogram(int numOfBins) { - this.bins = new Bins(numOfBins); - } - - /** - * Constructor setting the bins assuming a uniform distribution within a range. - * @param numOfBins the number of bins for the histogram. A larger value results in more precise - * results but with lower efficiency, and vice versus. - * @param min lower bound of the region, inclusive. - * @param max higher bound of the region, inclusive. - */ - public FastLongHistogram(int numOfBins, long min, long max) { - this(numOfBins); - Bins bins = new Bins(numOfBins); - bins.add(min, 1); - bins.add(max, 1); - this.bins = new Bins(bins, numOfBins, 0.01, 0.999); - } - - private FastLongHistogram(Bins bins) { - this.bins = bins; - } - - /** - * Adds a value to the histogram. - */ - public void add(long value, long count) { - this.bins.add(value, count); - } - - /** - * Computes the quantiles give the ratios. - */ - public long[] getQuantiles(double[] quantiles) { - return this.bins.getQuantiles(quantiles); - } - - public long[] getQuantiles() { - return this.bins.getQuantiles(DEFAULT_QUANTILES); - } - - public long getMin() { - long min = this.bins.min.get(); - return min == Long.MAX_VALUE ? 0 : min; // in case it is not initialized - } - - public long getMax() { - return this.bins.max.get(); - } - - public long getCount() { - return this.bins.count.get(); - } - - public long getMean() { - Bins bins = this.bins; - long count = bins.count.get(); - long total = bins.total.get(); - if (count == 0) { - return 0; - } - return total / count; - } - - public long getNumAtOrBelow(long value) { - return this.bins.getNumAtOrBelow(value); - } - - /** - * Resets the histogram for new counting. - */ - public FastLongHistogram reset() { - Bins oldBins = this.bins; - this.bins = new Bins(this.bins, this.bins.counts.length - 3, 0.01, 0.99); - return new FastLongHistogram(oldBins); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LongAdder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LongAdder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LongAdder.java new file mode 100644 index 0000000..9bdb829 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/LongAdder.java @@ -0,0 +1,224 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * One or more variables that together maintain an initially zero + * {@code long} sum. When updates (method {@link #add}) are contended + * across threads, the set of variables may grow dynamically to reduce + * contention. Method {@link #sum} (or, equivalently, {@link + * #longValue}) returns the current total combined across the + * variables maintaining the sum. + * + * <p>This class is usually preferable to {@link AtomicLong} when + * multiple threads update a common sum that is used for purposes such + * as collecting statistics, not for fine-grained synchronization + * control. Under low update contention, the two classes have similar + * characteristics. But under high contention, expected throughput of + * this class is significantly higher, at the expense of higher space + * consumption. + * + * <p>This class extends {@link Number}, but does <em>not</em> define + * methods such as {@code equals}, {@code hashCode} and {@code + * compareTo} because instances are expected to be mutated, and so are + * not useful as collection keys. + * + * <p><em>jsr166e note: This class is targeted to be placed in + * java.util.concurrent.atomic.</em> + * + * @since 1.8 + */ [email protected] [email protected] +public class LongAdder extends Striped64 implements Serializable { + private static final long serialVersionUID = 7249069246863182397L; + + /** + * Version of plus for use in retryUpdate + */ + final long fn(long v, long x) { return v + x; } + + /** + * Creates a new adder with initial sum of zero. + */ + public LongAdder() { + } + + /** + * Adds the given value. + * + * @param x the value to add + */ + public void add(long x) { + Cell[] as; long b, v; int[] hc; Cell a; int n; + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + if ((hc = threadHashCode.get()) == null || + as == null || (n = as.length) < 1 || + (a = as[(n - 1) & hc[0]]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + retryUpdate(x, hc, uncontended); + } + } + + /** + * Equivalent to {@code add(1)}. + */ + public void increment() { + add(1L); + } + + /** + * Equivalent to {@code add(-1)}. + */ + public void decrement() { + add(-1L); + } + + /** + * Returns the current sum. The returned value is <em>NOT</em> an + * atomic snapshot; invocation in the absence of concurrent + * updates returns an accurate result, but concurrent updates that + * occur while the sum is being calculated might not be + * incorporated. + * + * @return the sum + */ + public long sum() { + long sum = base; + Cell[] as = cells; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + sum += a.value; + } + } + return sum; + } + + /** + * Resets variables maintaining the sum to zero. This method may + * be a useful alternative to creating a new adder, but is only + * effective if there are no concurrent updates. Because this + * method is intrinsically racy, it should only be used when it is + * known that no threads are concurrently updating. + */ + public void reset() { + internalReset(0L); + } + + /** + * Equivalent in effect to {@link #sum} followed by {@link + * #reset}. This method may apply for example during quiescent + * points between multithreaded computations. If there are + * updates concurrent with this method, the returned value is + * <em>not</em> guaranteed to be the final value occurring before + * the reset. + * + * @return the sum + */ + public long sumThenReset() { + long sum = base; + Cell[] as = cells; + base = 0L; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + sum += a.value; + a.value = 0L; + } + } + } + return sum; + } + + /** + * Returns the String representation of the {@link #sum}. + * @return the String representation of the {@link #sum} + */ + public String toString() { + return Long.toString(sum()); + } + + /** + * Equivalent to {@link #sum}. + * + * @return the sum + */ + public long longValue() { + return sum(); + } + + /** + * Returns the {@link #sum} as an {@code int} after a narrowing + * primitive conversion. + */ + public int intValue() { + return (int)sum(); + } + + /** + * Returns the {@link #sum} as a {@code float} + * after a widening primitive conversion. + */ + public float floatValue() { + return (float)sum(); + } + + /** + * Returns the {@link #sum} as a {@code double} after a widening + * primitive conversion. + */ + public double doubleValue() { + return (double)sum(); + } + + private void writeObject(ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + s.writeLong(sum()); + } + + private void readObject(ObjectInputStream s) + throws IOException, ClassNotFoundException { + s.defaultReadObject(); + busy = 0; + cells = null; + base = s.readLong(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Striped64.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Striped64.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Striped64.java new file mode 100644 index 0000000..36f2fce --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Striped64.java @@ -0,0 +1,356 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package org.apache.hadoop.hbase.util; + +import java.util.Random; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A package-local class holding common representation and mechanics + * for classes supporting dynamic striping on 64bit values. The class + * extends Number so that concrete subclasses must publicly do so. + */ [email protected] +abstract class Striped64 extends Number { + /* + * This class maintains a lazily-initialized table of atomically + * updated variables, plus an extra "base" field. The table size + * is a power of two. Indexing uses masked per-thread hash codes. + * Nearly all declarations in this class are package-private, + * accessed directly by subclasses. + * + * Table entries are of class Cell; a variant of AtomicLong padded + * to reduce cache contention on most processors. Padding is + * overkill for most Atomics because they are usually irregularly + * scattered in memory and thus don't interfere much with each + * other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share + * cache lines (with a huge negative performance impact) without + * this precaution. + * + * In part because Cells are relatively large, we avoid creating + * them until they are needed. When there is no contention, all + * updates are made to the base field. Upon first contention (a + * failed CAS on base update), the table is initialized to size 2. + * The table size is doubled upon further contention until + * reaching the nearest power of two greater than or equal to the + * number of CPUS. Table slots remain empty (null) until they are + * needed. + * + * A single spinlock ("busy") is used for initializing and + * resizing the table, as well as populating slots with new Cells. + * There is no need for a blocking lock; when the lock is not + * available, threads try other slots (or the base). During these + * retries, there is increased contention and reduced locality, + * which is still better than alternatives. + * + * Per-thread hash codes are initialized to random values. + * Contention and/or table collisions are indicated by failed + * CASes when performing an update operation (see method + * retryUpdate). Upon a collision, if the table size is less than + * the capacity, it is doubled in size unless some other thread + * holds the lock. If a hashed slot is empty, and lock is + * available, a new Cell is created. Otherwise, if the slot + * exists, a CAS is tried. Retries proceed by "double hashing", + * using a secondary hash (Marsaglia XorShift) to try to find a + * free slot. + * + * The table size is capped because, when there are more threads + * than CPUs, supposing that each thread were bound to a CPU, + * there would exist a perfect hash function mapping threads to + * slots that eliminates collisions. When we reach capacity, we + * search for this mapping by randomly varying the hash codes of + * colliding threads. Because search is random, and collisions + * only become known via CAS failures, convergence can be slow, + * and because threads are typically not bound to CPUS forever, + * may not occur at all. However, despite these limitations, + * observed contention rates are typically low in these cases. + * + * It is possible for a Cell to become unused when threads that + * once hashed to it terminate, as well as in the case where + * doubling the table causes no thread to hash to it under + * expanded mask. We do not try to detect or remove such cells, + * under the assumption that for long-running instances, observed + * contention levels will recur, so the cells will eventually be + * needed again; and for short-lived ones, it does not matter. + */ + + /** + * Padded variant of AtomicLong supporting only raw accesses plus CAS. + * The value field is placed between pads, hoping that the JVM doesn't + * reorder them. + * + * JVM intrinsics note: It would be possible to use a release-only + * form of CAS here, if it were provided. + */ + static final class Cell { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + Cell(long x) { value = x; } + + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = getUnsafe(); + Class<?> ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + } + + /** + * ThreadLocal holding a single-slot int array holding hash code. + * Unlike the JDK8 version of this class, we use a suboptimal + * int[] representation to avoid introducing a new type that can + * impede class-unloading when ThreadLocals are not removed. + */ + static final ThreadLocal<int[]> threadHashCode = new ThreadLocal<int[]>(); + + /** + * Generator of new random hash codes + */ + static final Random rng = new Random(); + + /** Number of CPUS, to place bound on table size */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** + * Table of cells. When non-null, size is a power of 2. + */ + transient volatile Cell[] cells; + + /** + * Base value, used mainly when there is no contention, but also as + * a fallback during table initialization races. Updated via CAS. + */ + transient volatile long base; + + /** + * Spinlock (locked via CAS) used when resizing and/or creating Cells. + */ + transient volatile int busy; + + /** + * Package-private default constructor + */ + Striped64() { + } + + /** + * CASes the base field. + */ + final boolean casBase(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); + } + + /** + * CASes the busy field from 0 to 1 to acquire lock. + */ + final boolean casBusy() { + return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); + } + + /** + * Computes the function of current and new value. Subclasses + * should open-code this update function for most uses, but the + * virtualized form is needed within retryUpdate. + * + * @param currentValue the current value (of either base or a cell) + * @param newValue the argument from a user update call + * @return result of the update function + */ + abstract long fn(long currentValue, long newValue); + + /** + * Handles cases of updates involving initialization, resizing, + * creating new Cells, and/or contention. See above for + * explanation. This method suffers the usual non-modularity + * problems of optimistic retry code, relying on rechecked sets of + * reads. + * + * @param x the value + * @param hc the hash code holder + * @param wasUncontended false if CAS failed before call + */ + final void retryUpdate(long x, int[] hc, boolean wasUncontended) { + int h; + if (hc == null) { + threadHashCode.set(hc = new int[1]); // Initialize randomly + int r = rng.nextInt(); // Avoid zero to allow xorShift rehash + h = hc[0] = (r == 0) ? 1 : r; + } + else + h = hc[0]; + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; + if ((as = cells) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (busy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (busy == 0 && casBusy()) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; + } + else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + else if (a.cas(v = a.value, fn(v, x))) + break; + else if (n >= NCPU || cells != as) + collide = false; // At max size or stale + else if (!collide) + collide = true; + else if (busy == 0 && casBusy()) { + try { + if (cells == as) { // Expand table unless stale + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + busy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; + hc[0] = h; // Record index for next time + } + else if (busy == 0 && cells == as && casBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + Cell[] rs = new Cell[2]; + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + busy = 0; + } + if (init) + break; + } + else if (casBase(v = base, fn(v, x))) + break; // Fall back on using base + } + } + + + /** + * Sets base and all cells to the given value. + */ + final void internalReset(long initialValue) { + Cell[] as = cells; + base = initialValue; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + a.value = initialValue; + } + } + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long baseOffset; + private static final long busyOffset; + static { + try { + UNSAFE = getUnsafe(); + Class<?> sk = Striped64.class; + baseOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() { + public sun.misc.Unsafe run() throws Exception { + Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestFastLongHistogram.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestFastLongHistogram.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestFastLongHistogram.java deleted file mode 100644 index 7304e2d..0000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestFastLongHistogram.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import java.util.Arrays; -import java.util.Random; - - -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; - -/** - * Testcases for FastLongHistogram. - */ -@Category(SmallTests.class) -public class TestFastLongHistogram { - - private static void doTestUniform(FastLongHistogram hist) { - long[] VALUES = { 0, 10, 20, 30, 40, 50 }; - double[] qs = new double[VALUES.length]; - for (int i = 0; i < qs.length; i++) { - qs[i] = (double) VALUES[i] / VALUES[VALUES.length - 1]; - } - - for (int i = 0; i < 10; i++) { - for (long v : VALUES) { - hist.add(v, 1); - } - long[] vals = hist.getQuantiles(qs); - System.out.println(Arrays.toString(vals)); - for (int j = 0; j < qs.length; j++) { - Assert.assertTrue(j + "-th element org: " + VALUES[j] + ", act: " + vals[j], - Math.abs(vals[j] - VALUES[j]) <= 10); - } - hist.reset(); - } - } - - @Test - public void testUniform() { - FastLongHistogram hist = new FastLongHistogram(100, 0, 50); - doTestUniform(hist); - } - - @Test - public void testAdaptionOfChange() { - // assumes the uniform distribution - FastLongHistogram hist = new FastLongHistogram(100, 0, 100); - - Random rand = new Random(); - - for (int n = 0; n < 10; n++) { - for (int i = 0; i < 900; i++) { - hist.add(rand.nextInt(100), 1); - } - - // add 10% outliers, this breaks the assumption, hope bin10xMax works - for (int i = 0; i < 100; i++) { - hist.add(1000 + rand.nextInt(100), 1); - } - - long[] vals = hist.getQuantiles(new double[] { 0.25, 0.75, 0.95 }); - System.out.println(Arrays.toString(vals)); - if (n == 0) { - Assert.assertTrue("Out of possible value", vals[0] >= 0 && vals[0] <= 50); - Assert.assertTrue("Out of possible value", vals[1] >= 50 && vals[1] <= 100); - Assert.assertTrue("Out of possible value", vals[2] >= 900 && vals[2] <= 1100); - } - - hist.reset(); - } - } - - - @Test - public void testGetNumAtOrBelow() { - long[] VALUES = { 1, 10, 20, 30, 40, 50 }; - - FastLongHistogram h = new FastLongHistogram(); - for (long v : VALUES) { - for (int i = 0; i < 100; i++) { - h.add(v, 1); - } - } - - h.add(Integer.MAX_VALUE, 1); - - h.reset(); - - for (long v : VALUES) { - for (int i = 0; i < 100; i++) { - h.add(v, 1); - } - } - // Add something way out there to make sure it doesn't throw off the counts. - h.add(Integer.MAX_VALUE, 1); - - assertEquals(100, h.getNumAtOrBelow(1)); - assertEquals(200, h.getNumAtOrBelow(11)); - assertEquals(601, h.getNumAtOrBelow(Long.MAX_VALUE)); - } - - - @Test - public void testSameValues() { - FastLongHistogram hist = new FastLongHistogram(100); - - hist.add(50, 100); - - hist.reset(); - doTestUniform(hist); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java new file mode 100644 index 0000000..b86856a --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleMasterObserverWithMetrics.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.metrics.Counter; +import org.apache.hadoop.hbase.metrics.Gauge; +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.metrics.Timer; + +/** + * An example coprocessor that collects some metrics to demonstrate the usage of exporting custom + * metrics from the coprocessor. + * + * <p> + * These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb, + * etc) as well as JMX output. You can view a snapshot of the metrics by going to the http web UI + * of the master page, something like http://mymasterhost:16010/jmx + * </p> + * @see ExampleRegionObserverWithMetrics + */ +public class ExampleMasterObserverWithMetrics extends BaseMasterObserver { + + private static final Log LOG = LogFactory.getLog(ExampleMasterObserverWithMetrics.class); + + /** This is the Timer metric object to keep track of the current count across invocations */ + private Timer createTableTimer; + private long createTableStartTime = Long.MIN_VALUE; + + /** This is a Counter object to keep track of disableTable operations */ + private Counter disableTableCounter; + + /** Returns the total memory of the process. We will use this to define a gauge metric */ + private long getTotalMemory() { + return Runtime.getRuntime().totalMemory(); + } + + /** Returns the max memory of the process. We will use this to define a gauge metric */ + private long getMaxMemory() { + return Runtime.getRuntime().maxMemory(); + } + + @Override + public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + HTableDescriptor desc, HRegionInfo[] regions) throws IOException { + super.preCreateTable(ctx, desc, regions); + // we rely on the fact that there is only 1 instance of our MasterObserver. We keep track of + // when the operation starts before the operation is executing. + this.createTableStartTime = System.currentTimeMillis(); + } + + @Override + public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, + HTableDescriptor desc, HRegionInfo[] regions) throws IOException { + super.postCreateTable(ctx, desc, regions); + if (this.createTableStartTime > 0) { + long time = System.currentTimeMillis() - this.createTableStartTime; + LOG.info("Create table took: " + time); + + // Update the timer metric for the create table operation duration. + createTableTimer.updateMillis(time); + } + } + + @Override + public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName) throws IOException { + super.preDisableTable(ctx, tableName); + + // Increment the Counter for disable table operations + this.disableTableCounter.increment(); + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + super.start(env); + + // start for the MasterObserver will be called only once in the lifetime of the + // server. We will construct and register all metrics that we will track across method + // invocations. + + if (env instanceof MasterCoprocessorEnvironment) { + // Obtain the MetricRegistry for the Master. Metrics from this registry will be reported + // at the master level per-server. + MetricRegistry registry = + ((MasterCoprocessorEnvironment) env).getMetricRegistryForMaster(); + + if (createTableTimer == null) { + // Create a new Counter, or get the already registered counter. + // It is much better to only call this once and save the Counter as a class field instead + // of creating the counter every time a coprocessor method is invoked. This will negate + // any performance bottleneck coming from map lookups tracking metrics in the registry. + createTableTimer = registry.timer("CreateTable"); + + // on stop(), we can remove these registered metrics via calling registry.remove(). But + // it is not needed for coprocessors at the master level. If coprocessor is stopped, + // the server is stopping anyway, so there will not be any resource leaks. + } + + if (disableTableCounter == null) { + disableTableCounter = registry.counter("DisableTable"); + } + + // Register a custom gauge. The Gauge object will be registered in the metrics registry and + // periodically the getValue() is invoked to obtain the snapshot. + registry.register("totalMemory", new Gauge<Long>() { + @Override + public Long getValue() { + return getTotalMemory(); + } + }); + + // Register a custom gauge (Supplier converted into Gauge) + registry.register("maxMemory", new Gauge<Long>() { + @Override + public Long getValue() { + return getMaxMemory(); + } + + }); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java new file mode 100644 index 0000000..7606b05 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ExampleRegionObserverWithMetrics.java @@ -0,0 +1,121 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.metrics.Counter; +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.metrics.Timer; + +/** + * An example coprocessor that collects some metrics to demonstrate the usage of exporting custom + * metrics from the coprocessor. + * <p> + * These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb, + * etc) as well as JMX output. You can view a snapshot of the metrics by going to the http web UI + * of the regionserver page, something like http://myregionserverhost:16030/jmx + * </p> + * + * @see ExampleMasterObserverWithMetrics + */ +public class ExampleRegionObserverWithMetrics extends BaseRegionObserver { + + private Counter preGetCounter; + private Timer costlyOperationTimer; + + @Override + public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) + throws IOException { + super.preGetOp(e, get, results); + + // Increment the Counter whenever the coprocessor is called + preGetCounter.increment(); + } + + @Override + public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, + List<Cell> results) throws IOException { + super.postGetOp(e, get, results); + + // do a costly (high latency) operation which we want to measure how long it takes by + // using a Timer (which is a Meter and a Histogram). + long start = System.nanoTime(); + try { + performCostlyOperation(); + } finally { + costlyOperationTimer.updateNanos(System.nanoTime() - start); + } + } + + private void performCostlyOperation() { + try { + // simulate the operation by sleeping. + Thread.sleep(ThreadLocalRandom.current().nextLong(100)); + } catch (InterruptedException ignore) {} + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + super.start(env); + + // start for the RegionServerObserver will be called only once in the lifetime of the + // server. We will construct and register all metrics that we will track across method + // invocations. + + if (env instanceof RegionCoprocessorEnvironment) { + // Obtain the MetricRegistry for the RegionServer. Metrics from this registry will be reported + // at the region server level per-regionserver. + MetricRegistry registry = + ((RegionCoprocessorEnvironment) env).getMetricRegistryForRegionServer(); + + if (preGetCounter == null) { + // Create a new Counter, or get the already registered counter. + // It is much better to only call this once and save the Counter as a class field instead + // of creating the counter every time a coprocessor method is invoked. This will negate + // any performance bottleneck coming from map lookups tracking metrics in the registry. + // Returned counter instance is shared by all coprocessors of the same class in the same + // region server. + preGetCounter = registry.counter("preGetRequests"); + } + + if (costlyOperationTimer == null) { + // Create a Timer to track execution times for the costly operation. + costlyOperationTimer = registry.timer("costlyOperation"); + } + } + } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { + // we should NOT remove / deregister the metrics in stop(). The whole registry will be + // removed when the last region of the table is closed. + super.stop(e); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop-compat/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/pom.xml b/hbase-hadoop-compat/pom.xml index 102c8b6..bdb5d0a 100644 --- a/hbase-hadoop-compat/pom.xml +++ b/hbase-hadoop-compat/pom.xml @@ -122,6 +122,10 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-math</artifactId> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-metrics-api</artifactId> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSource.java index f79aa9f..0350bff 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSource.java @@ -104,4 +104,6 @@ public interface BaseSource { */ String getMetricsName(); + MetricRegistryInfo getMetricRegistryInfo(); + } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/pom.xml b/hbase-hadoop2-compat/pom.xml index 6462c61..54a31b6 100644 --- a/hbase-hadoop2-compat/pom.xml +++ b/hbase-hadoop2-compat/pom.xml @@ -166,6 +166,10 @@ limitations under the License. </dependency> <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-metrics</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java index f843ec2..a112e9d 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/BaseSourceImpl.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.metrics; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.metrics.impl.GlobalMetricRegistriesAdapter; +import org.apache.hadoop.hbase.metrics.impl.HBaseMetrics2HadoopMetricsAdapter; +import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSourceImpl; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.impl.JmxCacheBuster; @@ -47,15 +50,53 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { inited = true; DefaultMetricsSystem.initialize(HBASE_METRICS_SYSTEM_NAME); JvmMetrics.initSingleton(name, ""); + // initialize hbase-metrics module based metric system as well. GlobalMetricRegistriesSource + // initialization depends on the metric system being already initialized, that is why we are + // doing it here. Once BaseSourceSourceImpl is removed, we should do the initialization of + // these elsewhere. + GlobalMetricRegistriesAdapter.init(); } } + /** + * @deprecated Use hbase-metrics/hbase-metrics-api module interfaces for new metrics. + * Defining BaseSources for new metric groups (WAL, RPC, etc) is not needed anymore, however, + * for existing BaseSource implemetnations, please use the field named "registry" which is a + * MetricRegistry instance together with the HBaseMetrics2HadoopMetricsAdapter. + */ + @Deprecated protected final DynamicMetricsRegistry metricsRegistry; protected final String metricsName; protected final String metricsDescription; protected final String metricsContext; protected final String metricsJmxContext; + /** + * Note that there are at least 4 MetricRegistry definitions in the source code. The first one is + * Hadoop Metrics2 MetricRegistry, second one is DynamicMetricsRegistry which is HBase's fork + * of the Hadoop metrics2 class. The third one is the dropwizard metrics implementation of + * MetricRegistry, and finally a new API abstraction in HBase that is the + * o.a.h.h.metrics.MetricRegistry class. This last one is the new way to use metrics within the + * HBase code. However, the others are in play because of existing metrics2 based code still + * needs to coexists until we get rid of all of our BaseSource and convert them to the new + * framework. Until that happens, new metrics can use the new API, but will be collected + * through the HBaseMetrics2HadoopMetricsAdapter class. + * + * BaseSourceImpl has two MetricRegistries. metricRegistry is for hadoop Metrics2 based + * metrics, while the registry is for hbase-metrics based metrics. + */ + protected final MetricRegistry registry; + + /** + * The adapter from hbase-metrics module to metrics2. This adepter is the connection between the + * Metrics in the MetricRegistry and the Hadoop Metrics2 system. Using this adapter, existing + * BaseSource implementations can define new metrics using the hbase-metrics/hbase-metrics-api + * module interfaces and still be able to make use of metrics2 sinks (including JMX). Existing + * BaseSources should call metricsAdapter.snapshotAllMetrics() in getMetrics() method. See + * {@link MetricsRegionServerSourceImpl}. + */ + protected final HBaseMetrics2HadoopMetricsAdapter metricsAdapter; + public BaseSourceImpl( String metricsName, String metricsDescription, @@ -72,6 +113,11 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { //Register this instance. DefaultMetricsSystem.instance().register(metricsJmxContext, metricsDescription, this); + + // hbase-metrics module based metrics are registered in the hbase MetricsRegistry. + registry = MetricRegistries.global().create(this.getMetricRegistryInfo()); + metricsAdapter = new HBaseMetrics2HadoopMetricsAdapter(); + init(); } @@ -166,4 +212,10 @@ public class BaseSourceImpl implements BaseSource, MetricsSource { return metricsName; } + @Override + public MetricRegistryInfo getMetricRegistryInfo() { + return new MetricRegistryInfo(getMetricsName(), getMetricsDescription(), + getMetricsContext(), getMetricsJmxContext(), true); + } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/GlobalMetricRegistriesAdapter.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/GlobalMetricRegistriesAdapter.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/GlobalMetricRegistriesAdapter.java new file mode 100644 index 0000000..40a358f --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/GlobalMetricRegistriesAdapter.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.impl; + + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang.reflect.FieldUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.metrics.MetricRegistries; +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsExecutor; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.JmxCacheBuster; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystemHelper; +import org.apache.hadoop.metrics2.lib.MetricsExecutorImpl; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; + +/** + * This class acts as an adapter to export the MetricRegistry's in the global registry. Each + * MetricRegistry will be registered or unregistered from the metric2 system. The collection will + * be performed via the MetricsSourceAdapter and the MetricRegistry will collected like a + * BaseSource instance for a group of metrics (like WAL, RPC, etc) with the MetricRegistryInfo's + * JMX context. + * + * <p>Developer note: + * Unlike the current metrics2 based approach, the new metrics approach + * (hbase-metrics-api and hbase-metrics modules) work by having different MetricRegistries that are + * initialized and used from the code that lives in their respective modules (hbase-server, etc). + * There is no need to define BaseSource classes and do a lot of indirection. The MetricRegistry'es + * will be in the global MetricRegistriesImpl, and this class will iterate over + * MetricRegistries.global() and register adapters to the metrics2 subsystem. These adapters then + * report the actual values by delegating to + * {@link HBaseMetrics2HadoopMetricsAdapter#snapshotAllMetrics(MetricRegistry, MetricsCollector)}. + * + * We do not initialize the Hadoop Metrics2 system assuming that other BaseSources already do so + * (see BaseSourceImpl). Once the last BaseSource is moved to the new system, the metric2 + * initialization should be moved here. + * </p> + */ +public class GlobalMetricRegistriesAdapter { + + private static final Log LOG = LogFactory.getLog(GlobalMetricRegistriesAdapter.class); + + private class MetricsSourceAdapter implements MetricsSource { + private final MetricRegistry registry; + MetricsSourceAdapter(MetricRegistry registry) { + this.registry = registry; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + metricsAdapter.snapshotAllMetrics(registry, collector); + } + } + + private final MetricsExecutor executor; + private final AtomicBoolean stopped; + private final DefaultMetricsSystemHelper helper; + private final HBaseMetrics2HadoopMetricsAdapter metricsAdapter; + private final HashMap<MetricRegistryInfo, MetricsSourceAdapter> registeredSources; + + private GlobalMetricRegistriesAdapter() { + this.executor = new MetricsExecutorImpl(); + this.stopped = new AtomicBoolean(false); + this.metricsAdapter = new HBaseMetrics2HadoopMetricsAdapter(); + this.registeredSources = new HashMap<>(); + this.helper = new DefaultMetricsSystemHelper(); + executor.getExecutor().scheduleAtFixedRate(new Runnable(){ + @Override + public void run() { + doRun(); + + }}, 10, 10, TimeUnit.SECONDS); + } + + /** + * Make sure that this global MetricSource for hbase-metrics module based metrics are initialized. + * This should be called only once. + */ + public static GlobalMetricRegistriesAdapter init() { + return new GlobalMetricRegistriesAdapter(); + } + + @VisibleForTesting + public void stop() { + stopped.set(true); + } + + private void doRun() { + if (stopped.get()) { + executor.stop(); + return; + } + if (LOG.isTraceEnabled()) { + LOG.trace("doRun called: " + registeredSources); + } + + Collection<MetricRegistry> registries = MetricRegistries.global().getMetricRegistries(); + for (MetricRegistry registry : registries) { + MetricRegistryInfo info = registry.getMetricRegistryInfo(); + + if (info.isExistingSource()) { + // If there is an already existing BaseSource for this MetricRegistry, skip it here. These + // types of registries are there only due to existing BaseSource implementations in the + // source code (like MetricsRegionServer, etc). This is to make sure that we can transition + // iteratively to the new hbase-metrics system. These type of MetricRegistry metrics will be + // exported from the BaseSource.getMetrics() call directly because there is already a + // MetricRecordBuilder there (see MetricsRegionServerSourceImpl). + continue; + } + + if (!registeredSources.containsKey(info)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Registering adapter for the MetricRegistry: " + info.getMetricsJmxContext()); + } + // register this as a MetricSource under different JMX Context'es. + MetricsSourceAdapter adapter = new MetricsSourceAdapter(registry); + LOG.info("Registering " + info.getMetricsJmxContext() + " " + info.getMetricsDescription()); + DefaultMetricsSystem.instance().register(info.getMetricsJmxContext(), + info.getMetricsDescription(), adapter); + registeredSources.put(info, adapter); + // next collection will collect the newly registered MetricSource. Doing this here leads to + // ConcurrentModificationException. + } + } + + boolean removed = false; + // Remove registered sources if it is removed from the global registry + for (Iterator<Entry<MetricRegistryInfo, MetricsSourceAdapter>> it = + registeredSources.entrySet().iterator(); it.hasNext();) { + Entry<MetricRegistryInfo, MetricsSourceAdapter> entry = it.next(); + MetricRegistryInfo info = entry.getKey(); + Optional<MetricRegistry> found = MetricRegistries.global().get(info); + if (!found.isPresent()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing adapter for the MetricRegistry: " + info.getMetricsJmxContext()); + } + synchronized(DefaultMetricsSystem.instance()) { + unregisterSource(info); + helper.removeSourceName(info.getMetricsJmxContext()); + helper.removeObjectName(info.getMetricsJmxContext()); + it.remove(); + removed = true; + } + } + } + if (removed) { + JmxCacheBuster.clearJmxCache(); + } + } + + /** + * Use reflection to unregister the Hadoop metric source, since MetricsSystem#unregisterSource() + * is only available in Hadoop 2.6+ (HADOOP-10839) + */ + @VisibleForTesting + protected void unregisterSource(MetricRegistryInfo info) { + // unregisterSource is only available in Hadoop 2.6+ (HADOOP-10839). Don't unregister for now + MetricsSystem metricsSystem = DefaultMetricsSystem.instance(); + if (metricsSystem instanceof MetricsSystemImpl) { + try { + // it's actually a Map<String, MetricsSourceAdapter> , but MetricsSourceAdapter isn't + // accessible + @SuppressWarnings("unchecked") + Map<String, Object> sources = + (Map<String, Object>) FieldUtils.readField(metricsSystem, "sources", true); + String sourceName = info.getMetricsJmxContext(); + if (sources.containsKey(sourceName)) { + Object sourceAdapter = sources.get(sourceName); + Method method = null; + try { + method = sourceAdapter.getClass().getDeclaredMethod("stop"); + } catch (NoSuchMethodException e) { + LOG.info("Stop method not found on MetricsSourceAdapter"); + } catch (SecurityException e) { + LOG.info("Don't have access to call stop method not found on MetricsSourceAdapter", e); + } + if (method != null) { + method.setAccessible(true); + try { + method.invoke(sourceAdapter); + } catch (IllegalArgumentException | InvocationTargetException e) { + LOG.warn("Couldn't invoke stop on metrics source adapter: " + sourceName); + e.printStackTrace(); + } + } + sources.remove(sourceName); + + } + @SuppressWarnings("unchecked") + Map<String, MetricsSource> allSources = + (Map<String, MetricsSource>) FieldUtils.readField(metricsSystem, "allSources", true); + if (allSources.containsKey(sourceName)) { + allSources.remove(sourceName); + } + } catch (IllegalAccessException e) { + LOG.warn("Error unregistering metric source " + info.getMetricsJmxContext()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/HBaseMetrics2HadoopMetricsAdapter.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/HBaseMetrics2HadoopMetricsAdapter.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/HBaseMetrics2HadoopMetricsAdapter.java new file mode 100644 index 0000000..ec4a1a7 --- /dev/null +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/metrics/impl/HBaseMetrics2HadoopMetricsAdapter.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Copyright 2016 Josh Elser + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.metrics.impl; + +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.metrics.Counter; +import org.apache.hadoop.hbase.metrics.Gauge; +import org.apache.hadoop.hbase.metrics.Histogram; +import org.apache.hadoop.hbase.metrics.Meter; +import org.apache.hadoop.hbase.metrics.Metric; +import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; +import org.apache.hadoop.hbase.metrics.Timer; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MutableHistogram; + +/** + * This is the adapter from "HBase Metrics Framework", implemented in hbase-metrics-api and + * hbase-metrics modules to the Hadoop Metrics2 framework. This adapter is not a metric source, + * but a helper to be able to collect all of the Metric's in the MetricRegistry using the + * MetricsCollector and MetricsRecordBuilder. + * + * Some of the code is forked from https://github.com/joshelser/dropwizard-hadoop-metrics2. + */ +public class HBaseMetrics2HadoopMetricsAdapter { + private static final Log LOG + = LogFactory.getLog(HBaseMetrics2HadoopMetricsAdapter.class); + private static final String EMPTY_STRING = ""; + + public HBaseMetrics2HadoopMetricsAdapter() { + } + + /** + * Iterates over the MetricRegistry and adds them to the {@code collector}. + * + * @param collector A metrics collector + */ + public void snapshotAllMetrics(MetricRegistry metricRegistry, + MetricsCollector collector) { + MetricRegistryInfo info = metricRegistry.getMetricRegistryInfo(); + MetricsRecordBuilder builder = collector.addRecord(Interns.info(info.getMetricsName(), + info.getMetricsDescription())); + builder.setContext(info.getMetricsContext()); + + snapshotAllMetrics(metricRegistry, builder); + } + + /** + * Iterates over the MetricRegistry and adds them to the {@code builder}. + * + * @param builder A record builder + */ + public void snapshotAllMetrics(MetricRegistry metricRegistry, MetricsRecordBuilder builder) { + Map<String, Metric> metrics = metricRegistry.getMetrics(); + + for (Map.Entry<String, Metric> e: metrics.entrySet()) { + // Always capitalize the name + String name = StringUtils.capitalize(e.getKey()); + Metric metric = e.getValue(); + + if (metric instanceof Gauge) { + addGauge(name, (Gauge<?>) metric, builder); + } else if (metric instanceof Counter) { + addCounter(name, (Counter)metric, builder); + } else if (metric instanceof Histogram) { + addHistogram(name, (Histogram)metric, builder); + } else if (metric instanceof Meter) { + addMeter(name, (Meter)metric, builder); + } else if (metric instanceof Timer) { + addTimer(name, (Timer)metric, builder); + } else { + LOG.info("Ignoring unknown Metric class " + metric.getClass().getName()); + } + } + } + + private void addGauge(String name, Gauge<?> gauge, MetricsRecordBuilder builder) { + final MetricsInfo info = Interns.info(name, EMPTY_STRING); + final Object o = gauge.getValue(); + + // Figure out which gauge types metrics2 supports and call the right method + if (o instanceof Integer) { + builder.addGauge(info, (int) o); + } else if (o instanceof Long) { + builder.addGauge(info, (long) o); + } else if (o instanceof Float) { + builder.addGauge(info, (float) o); + } else if (o instanceof Double) { + builder.addGauge(info, (double) o); + } else { + LOG.warn("Ignoring Gauge (" + name + ") with unhandled type: " + o.getClass()); + } + } + + private void addCounter(String name, Counter counter, MetricsRecordBuilder builder) { + MetricsInfo info = Interns.info(name, EMPTY_STRING); + builder.addCounter(info, counter.getCount()); + } + + /** + * Add Histogram value-distribution data to a Hadoop-Metrics2 record building. + * + * @param builder A Hadoop-Metrics2 record builder. + * @param name A base name for this record. + * @param desc A description for this record. + * @param snapshot The distribution of measured values. + */ + private void addHistogram(String name, Histogram histogram, MetricsRecordBuilder builder) { + MutableHistogram.snapshot(name, EMPTY_STRING, histogram, builder, true); + } + + /** + * Add Dropwizard-Metrics rate information to a Hadoop-Metrics2 record builder, converting the + * rates to the appropriate unit. + * + * @param builder A Hadoop-Metrics2 record builder. + * @param name A base name for this record. + */ + private void addMeter(String name, Meter meter, MetricsRecordBuilder builder) { + builder.addGauge(Interns.info(name + "_count", EMPTY_STRING), meter.getCount()); + builder.addGauge(Interns.info(name + "_mean_rate", EMPTY_STRING), meter.getMeanRate()); + builder.addGauge(Interns.info(name + "_1min_rate", EMPTY_STRING), meter.getOneMinuteRate()); + builder.addGauge(Interns.info(name + "_5min_rate", EMPTY_STRING), meter.getFiveMinuteRate()); + builder.addGauge(Interns.info(name + "_15min_rate", EMPTY_STRING), + meter.getFifteenMinuteRate()); + } + + private void addTimer(String name, Timer timer, MetricsRecordBuilder builder) { + addMeter(name, timer.getMeter(), builder); + addHistogram(name, timer.getHistogram(), builder); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java index f1bb0da..d3329db 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java @@ -473,6 +473,12 @@ public class MetricsRegionServerSourceImpl } metricsRegistry.snapshot(mrb, all); + + // source is registered in supers constructor, sometimes called before the whole initialization. + if (metricsAdapter != null) { + // snapshot MetricRegistry as well + metricsAdapter.snapshotAllMetrics(registry, mrb); + } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java index 7a34e45..0e5c07f 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; +import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; @@ -243,4 +244,10 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS public String getMetricsName() { return rms.getMetricsName(); } + + @Override + public MetricRegistryInfo getMetricRegistryInfo() { + return new MetricRegistryInfo(getMetricsName(), getMetricsDescription(), + getMetricsContext(), getMetricsJmxContext(), true); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index 0b6a1e1..9838e42 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import org.apache.hadoop.hbase.metrics.MetricRegistryInfo; import org.apache.hadoop.metrics2.lib.MutableFastCounter; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableHistogram; @@ -306,4 +307,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou public String getMetricsName() { return rms.getMetricsName(); } + + @Override + public MetricRegistryInfo getMetricRegistryInfo() { + return new MetricRegistryInfo(getMetricsName(), getMetricsDescription(), + getMetricsContext(), getMetricsJmxContext(), true); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c3f101/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystemHelper.java ---------------------------------------------------------------------- diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystemHelper.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystemHelper.java index 832e220..eb465c3 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystemHelper.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/DefaultMetricsSystemHelper.java @@ -17,26 +17,44 @@ */ package org.apache.hadoop.metrics2.lib; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.HashMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.lang.reflect.Method; - public class DefaultMetricsSystemHelper { private static final Log LOG = LogFactory.getLog(DefaultMetricsSystemHelper.class); private final Method removeObjectMethod; + private final Field sourceNamesField; + private final Field mapField; public DefaultMetricsSystemHelper() { + Class<? extends DefaultMetricsSystem> clazz = DefaultMetricsSystem.INSTANCE.getClass(); Method m; try { - Class<? extends DefaultMetricsSystem> clazz = DefaultMetricsSystem.INSTANCE.getClass(); m = clazz.getDeclaredMethod("removeObjectName", String.class); m.setAccessible(true); } catch (NoSuchMethodException e) { m = null; } removeObjectMethod = m; + + Field f1, f2; + try { + f1 = clazz.getDeclaredField("sourceNames"); + f1.setAccessible(true); + f2 = UniqueNames.class.getDeclaredField("map"); + f2.setAccessible(true); + } catch (NoSuchFieldException e) { + LOG.trace(e); + f1 = null; + f2 = null; + } + sourceNamesField = f1; + mapField = f2; } public boolean removeObjectName(final String name) { @@ -52,4 +70,30 @@ public class DefaultMetricsSystemHelper { } return false; } + + /** + * Unfortunately Hadoop tries to be too-clever and permanently keeps track of all names registered + * so far as a Source, thus preventing further re-registration of the source with the same name. + * In case of dynamic metrics tied to region-lifecycles, this becomes a problem because we would + * like to be able to re-register and remove with the same name. Otherwise, it is resource leak. + * This ugly code manually removes the name from the UniqueNames map. + * TODO: May not be needed for Hadoop versions after YARN-5190. + */ + public void removeSourceName(String name) { + if (sourceNamesField == null || mapField == null) { + return; + } + try { + Object sourceNames = sourceNamesField.get(DefaultMetricsSystem.INSTANCE); + HashMap map = (HashMap) mapField.get(sourceNames); + synchronized (sourceNames) { + map.remove(name); + } + } catch (Exception ex) { + if (LOG.isTraceEnabled()) { + LOG.trace("Received exception while trying to access Hadoop Metrics classes via reflection.", + ex); + } + } + } }
