This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new d3b06d1 HADOOP-15481. Emit FairCallQueue stats as metrics.
Contributed by Christopher Gregorian.
d3b06d1 is described below
commit d3b06d179dc1c1ad624756b195f243b50b9d04c5
Author: Chen Liang <[email protected]>
AuthorDate: Thu Jan 17 13:30:42 2019 -0800
HADOOP-15481. Emit FairCallQueue stats as metrics. Contributed by
Christopher Gregorian.
---
.../java/org/apache/hadoop/ipc/FairCallQueue.java | 32 +++++++++++++++++--
.../hadoop-common/src/site/markdown/Metrics.md | 10 ++++++
.../org/apache/hadoop/ipc/TestFairCallQueue.java | 36 ++++++++++++++++++++++
3 files changed, 76 insertions(+), 2 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index 6d9ea3e..1431254 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -35,6 +35,11 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.util.MBeans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +48,7 @@ import org.slf4j.LoggerFactory;
* A queue with multiple levels for each priority.
*/
public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
- implements BlockingQueue<E> {
+ implements BlockingQueue<E> {
@Deprecated
public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
@Deprecated
@@ -335,7 +340,8 @@ public class FairCallQueue<E extends Schedulable> extends
AbstractQueue<E>
* MetricsProxy is a singleton because we may init multiple
* FairCallQueues, but the metrics system cannot unregister beans cleanly.
*/
- private static final class MetricsProxy implements FairCallQueueMXBean {
+ private static final class MetricsProxy implements FairCallQueueMXBean,
+ MetricsSource {
// One singleton per namespace
private static final HashMap<String, MetricsProxy> INSTANCES =
new HashMap<String, MetricsProxy>();
@@ -346,8 +352,13 @@ public class FairCallQueue<E extends Schedulable> extends
AbstractQueue<E>
// Keep track of how many objects we registered
private int revisionNumber = 0;
+ private String namespace;
+
private MetricsProxy(String namespace) {
+ this.namespace = namespace;
MBeans.register(namespace, "FairCallQueue", this);
+ final String name = namespace + ".FairCallQueue";
+ DefaultMetricsSystem.instance().register(name, name, this);
}
public static synchronized MetricsProxy getInstance(String namespace) {
@@ -389,6 +400,23 @@ public class FairCallQueue<E extends Schedulable> extends
AbstractQueue<E>
@Override public int getRevision() {
return revisionNumber;
}
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ MetricsRecordBuilder rb = collector.addRecord("FairCallQueue")
+ .setContext("rpc")
+ .tag(Interns.info("namespace", "Namespace"), namespace);
+
+ final int[] currentQueueSizes = getQueueSizes();
+ final long[] currentOverflowedCalls = getOverflowedCalls();
+
+ for (int i = 0; i < currentQueueSizes.length; i++) {
+ rb.addGauge(Interns.info("FairCallQueueSize_p" + i, "FCQ Queue Size"),
+ currentQueueSizes[i]);
+ rb.addCounter(Interns.info("FairCallQueueOverflowedCalls_p" + i,
+ "FCQ Overflowed Calls"), currentOverflowedCalls[i]);
+ }
+ }
}
// FairCallQueueMXBean
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 9dcd903..3d62868 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -104,6 +104,16 @@ RetryCache metrics is useful to monitor NameNode
fail-over. Each metrics record
| `CacheCleared` | Total number of RetryCache cleared |
| `CacheUpdated` | Total number of RetryCache updated |
+FairCallQueue
+-------------
+
+FairCallQueue metrics will only exist if FairCallQueue is enabled. Each metric
exists for each level of priority.
+
+| Name | Description |
+|:---- |:---- |
+| `FairCallQueueSize_p`*Priority* | Current number of calls in priority queue |
+| `FairCallQueueOverflowedCalls_p`*Priority* | Total number of overflowed
calls in priority queue |
+
rpcdetailed context
===================
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index caa9c86..4645e95 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.ipc;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doThrow;
@@ -612,4 +615,37 @@ public class TestFairCallQueue extends TestCase {
assertEquals(0, queueSizes[0]);
assertEquals(0, queueSizes[1]);
}
+
+ @Test
+ public void testFairCallQueueMetrics() throws Exception {
+ final String fcqMetrics = "ns.FairCallQueue";
+ Schedulable p0 = mockCall("a", 0);
+ Schedulable p1 = mockCall("b", 1);
+
+ assertGauge("FairCallQueueSize_p0", 0, getMetrics(fcqMetrics));
+ assertGauge("FairCallQueueSize_p1", 0, getMetrics(fcqMetrics));
+ assertCounter("FairCallQueueOverflowedCalls_p0", 0L,
+ getMetrics(fcqMetrics));
+ assertCounter("FairCallQueueOverflowedCalls_p1", 0L,
+ getMetrics(fcqMetrics));
+
+ for (int i = 0; i < 5; i++) {
+ fcq.add(p0);
+ fcq.add(p1);
+ }
+
+ try {
+ fcq.add(p1);
+ fail("didn't overflow");
+ } catch (IllegalStateException ise) {
+ // Expected exception
+ }
+
+ assertGauge("FairCallQueueSize_p0", 5, getMetrics(fcqMetrics));
+ assertGauge("FairCallQueueSize_p1", 5, getMetrics(fcqMetrics));
+ assertCounter("FairCallQueueOverflowedCalls_p0", 0L,
+ getMetrics(fcqMetrics));
+ assertCounter("FairCallQueueOverflowedCalls_p1", 1L,
+ getMetrics(fcqMetrics));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]