expand disruptor metrics name to include component/worker id
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6eaa1a85 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6eaa1a85 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6eaa1a85 Branch: refs/heads/1.x-branch Commit: 6eaa1a85a340801d066192c118ba87281b2f2313 Parents: 0be278a Author: P. Taylor Goetz <[email protected]> Authored: Fri Aug 11 16:08:34 2017 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Fri Aug 11 16:08:34 2017 -0400 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 1 + .../src/clj/org/apache/storm/daemon/worker.clj | 8 +++---- .../src/clj/org/apache/storm/disruptor.clj | 4 ++-- .../storm/metrics2/StormMetricRegistry.java | 22 ++++++++++---------- .../org/apache/storm/utils/DisruptorQueue.java | 4 ++-- .../utils/DisruptorQueueBackpressureTest.java | 2 +- .../apache/storm/utils/DisruptorQueueTest.java | 4 ++-- 7 files changed, 23 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 3e5dd20..95e43f6 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -234,6 +234,7 @@ (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) (.getStormId worker-context) + component-id (.getThisWorkerPort worker-context) :producer-type :single-threaded :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index b2810db..b52da52 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -205,13 +205,13 @@ (transfer-fn serializer tuple-batch))) transfer-fn))) -(defn- mk-receive-queue-map [storm-conf executors storm-id port] +(defn- mk-receive-queue-map [storm-conf executors storm-id worker-id port] (->> executors ;; TODO: this depends on the type of executor (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - storm-id port + storm-id worker-id port :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) (into {}) @@ -256,10 +256,10 @@ executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - storm-id port + storm-id worker-id port :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) - executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id port) + executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id worker-id port) receive-queue-map (->> executor-receive-queue-map (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue]))) http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/clj/org/apache/storm/disruptor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj index 73a9d84..c23c505 100644 --- a/storm-core/src/clj/org/apache/storm/disruptor.clj +++ b/storm-core/src/clj/org/apache/storm/disruptor.clj @@ -28,10 +28,10 @@ :single-threaded ProducerType/SINGLE}) (defnk disruptor-queue - [^String queue-name buffer-size timeout ^String storm-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] + [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] (DisruptorQueue. queue-name (PRODUCER-TYPE producer-type) buffer-size - timeout batch-size batch-timeout storm-id worker-port)) + timeout batch-size batch-timeout storm-id component-id worker-port)) (defn clojure-handler [afn] http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index 845745f..4c975a3 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -43,9 +43,9 @@ public class StormMetricRegistry { private static String hostName = null; - public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, Integer port){ + public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ SimpleGauge<T> gauge = new SimpleGauge<>(initialValue); - String metricName = metricName(name, topologyId, null, port); + String metricName = metricName(name, topologyId, componentId, port); if(REGISTRY.getGauges().containsKey(metricName)){ return (SimpleGauge)REGISTRY.getGauges().get(metricName); } else { @@ -53,16 +53,16 @@ public class StormMetricRegistry { } } - public static DisruptorMetrics disruptorMetrics(String name, String topologyId, Integer port){ + public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ return new DisruptorMetrics( - StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, port), - StormMetricRegistry.gauge(0L, name + "-population", topologyId, port), - StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, port), - StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, port), - StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, port), - StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, port), - StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, port), - StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, port) + StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), + StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), + StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), + StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), + StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) ); } http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 35bc83f..5c0a2fb 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -404,7 +404,7 @@ public class DisruptorQueue implements IStatefulObject { private final AtomicLong _overflowCount = new AtomicLong(0); private volatile boolean _throttleOn = false; - public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, int port) { + public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, int port) { this._queueName = PREFIX + queueName; WaitStrategy wait; if (readTimeout <= 0) { @@ -418,7 +418,7 @@ public class DisruptorQueue implements IStatefulObject { _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); - _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, port); + _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java index 110fe88..ba2b507 100644 --- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java +++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java @@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test",1000); } } http://git-wip-us.apache.org/repos/asf/storm/blob/6eaa1a85/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java index c834cbb..59de55d 100644 --- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java +++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java @@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test", 1000); } private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", 1000); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", "test", 1000); } }
