expand disruptor metrics name to include component/worker id

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6eaa1a85
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6eaa1a85
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6eaa1a85

Branch: refs/heads/metrics_v2
Commit: 6eaa1a85a340801d066192c118ba87281b2f2313
Parents: 0be278a
Author: P. Taylor Goetz <ptgo...@gmail.com>
Authored: Fri Aug 11 16:08:34 2017 -0400
Committer: P. Taylor Goetz <ptgo...@gmail.com>
Committed: Fri Aug 11 16:08:34 2017 -0400

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  1 +
 .../src/clj/org/apache/storm/daemon/worker.clj  |  8 +++----
 .../src/clj/org/apache/storm/disruptor.clj      |  4 ++--
 .../storm/metrics2/StormMetricRegistry.java     | 22 ++++++++++----------
 .../org/apache/storm/utils/DisruptorQueue.java  |  4 ++--
 .../utils/DisruptorQueueBackpressureTest.java   |  2 +-
 .../apache/storm/utils/DisruptorQueueTest.java  |  4 ++--
 7 files changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 3e5dd20..95e43f6 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -234,6 +234,7 @@
                                   (storm-conf 
TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
                                   (.getStormId worker-context)
+                                  component-id
                                   (.getThisWorkerPort worker-context)
                                   :producer-type :single-threaded
                                   :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)

http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index b2810db..b52da52 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -205,13 +205,13 @@
           (transfer-fn serializer tuple-batch)))
       transfer-fn)))
 
-(defn- mk-receive-queue-map [storm-conf executors storm-id port]
+(defn- mk-receive-queue-map [storm-conf executors storm-id worker-id port]
   (->> executors
        ;; TODO: this depends on the type of executor
        (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
                                                   (storm-conf 
TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
                                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  storm-id port
+                                                  storm-id worker-id port
                                                   :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
        (into {})
@@ -256,10 +256,10 @@
         executors (set (read-worker-executors storm-conf storm-cluster-state 
storm-id assignment-id port assignment-versions))
         transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" 
(storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   (storm-conf 
TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  storm-id port
+                                                  storm-id worker-id port
                                                   :batch-size (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf 
TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
-        executor-receive-queue-map (mk-receive-queue-map storm-conf executors 
storm-id port)
+        executor-receive-queue-map (mk-receive-queue-map storm-conf executors 
storm-id worker-id port)
 
         receive-queue-map (->> executor-receive-queue-map
                                (mapcat (fn [[e queue]] (for [t 
(executor-id->tasks e)] [t queue])))

http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj 
b/storm-core/src/clj/org/apache/storm/disruptor.clj
index 73a9d84..c23c505 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -28,10 +28,10 @@
    :single-threaded ProducerType/SINGLE})
 
 (defnk disruptor-queue
-  [^String queue-name buffer-size timeout ^String storm-id ^Integer 
worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+  [^String queue-name buffer-size timeout ^String storm-id ^String 
component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 
100 :batch-timeout 1]
   (DisruptorQueue. queue-name
                    (PRODUCER-TYPE producer-type) buffer-size
-                   timeout batch-size batch-timeout storm-id worker-port))
+                   timeout batch-size batch-timeout storm-id component-id 
worker-port))
 
 (defn clojure-handler
   [afn]

http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java 
b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 845745f..4c975a3 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -43,9 +43,9 @@ public class StormMetricRegistry {
 
     private static String hostName = null;
 
-    public static <T> SimpleGauge<T>  gauge(T initialValue, String name, 
String topologyId, Integer port){
+    public static <T> SimpleGauge<T>  gauge(T initialValue, String name, 
String topologyId, String componentId, Integer port){
         SimpleGauge<T> gauge = new SimpleGauge<>(initialValue);
-        String metricName = metricName(name, topologyId, null, port);
+        String metricName = metricName(name, topologyId, componentId, port);
             if(REGISTRY.getGauges().containsKey(metricName)){
                 return (SimpleGauge)REGISTRY.getGauges().get(metricName);
         } else {
@@ -53,16 +53,16 @@ public class StormMetricRegistry {
         }
     }
 
-    public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, Integer port){
+    public static DisruptorMetrics disruptorMetrics(String name, String 
topologyId, String componentId, Integer port){
         return new DisruptorMetrics(
-                StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, 
port),
-                StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, port),
-                StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, port),
-                StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, port),
-                StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, port),
-                StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, port),
-                StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, 
port),
-                StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, port)
+                StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, 
componentId, port),
+                StormMetricRegistry.gauge(0L, name + "-population", 
topologyId, componentId, port),
+                StormMetricRegistry.gauge(0L, name + "-write-position", 
topologyId, componentId, port),
+                StormMetricRegistry.gauge(0L, name + "-read-position", 
topologyId, componentId, port),
+                StormMetricRegistry.gauge(0.0, name + "-arrival-rate", 
topologyId, componentId, port),
+                StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", 
topologyId, componentId, port),
+                StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, 
componentId, port),
+                StormMetricRegistry.gauge(0.0F, name + "-percent-full", 
topologyId, componentId, port)
         );
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 35bc83f..5c0a2fb 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -404,7 +404,7 @@ public class DisruptorQueue implements IStatefulObject {
     private final AtomicLong _overflowCount = new AtomicLong(0);
     private volatile boolean _throttleOn = false;
 
-    public DisruptorQueue(String queueName, ProducerType type, int size, long 
readTimeout, int inputBatchSize, long flushInterval, String topologyId, int 
port) {
+    public DisruptorQueue(String queueName, ProducerType type, int size, long 
readTimeout, int inputBatchSize, long flushInterval, String topologyId, String 
componentId, int port) {
         this._queueName = PREFIX + queueName;
         WaitStrategy wait;
         if (readTimeout <= 0) {
@@ -418,7 +418,7 @@ public class DisruptorQueue implements IStatefulObject {
         _barrier = _buffer.newBarrier();
         _buffer.addGatingSequences(_consumer);
         _metrics = new QueueMetrics();
-        _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, 
topologyId, port);
+        _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, 
topologyId, componentId, port);
         //The batch size can be no larger than half the full queue size.
         //This is mostly to avoid contention issues.
         _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));

http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
 
b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
index 110fe88..ba2b507 100644
--- 
a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ 
b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
@@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends 
TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L, "test", 1000);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L, "test", "test",1000);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java 
b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
index c834cbb..59de55d 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
@@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L, "test", 1000);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 
1L, "test", "test", 1000);
     }
 
     private static DisruptorQueue createQueue(String name, int batchSize, int 
queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 
batchSize, 1L, "test", 1000);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 
batchSize, 1L, "test", "test", 1000);
     }
 }

Reply via email to