STORM-2153: add taskId to disruptor metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/427076eb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/427076eb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/427076eb Branch: refs/heads/1.x-branch Commit: 427076ebb6761e80f5ef71bbe6843f21854577c8 Parents: 1d42d8f Author: P. Taylor Goetz <[email protected]> Authored: Fri Jan 12 19:50:18 2018 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Fri Jan 12 19:50:18 2018 -0500 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 2 +- .../src/clj/org/apache/storm/daemon/worker.clj | 4 +-- .../src/clj/org/apache/storm/disruptor.clj | 4 +-- .../storm/metrics2/StormMetricRegistry.java | 26 +++++++++++--------- .../org/apache/storm/utils/DisruptorQueue.java | 6 +++-- .../utils/DisruptorQueueBackpressureTest.java | 2 +- .../apache/storm/utils/DisruptorQueueTest.java | 4 +-- 7 files changed, 26 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 3af9b2c..ecbfb14 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -234,7 +234,7 @@ (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) (.getStormId worker-context) - component-id + (first task-ids) component-id (.getThisWorkerPort worker-context) :producer-type :multi-threaded :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index b52da52..dd11959 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -211,7 +211,7 @@ (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - storm-id worker-id port + storm-id (int -1) "__system" port :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) (into {}) @@ -256,7 +256,7 @@ executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - storm-id worker-id port + storm-id (int -1) "__system" port :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id worker-id port) http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/disruptor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj index c23c505..6bbf0df 100644 --- a/storm-core/src/clj/org/apache/storm/disruptor.clj +++ b/storm-core/src/clj/org/apache/storm/disruptor.clj @@ -28,10 +28,10 @@ :single-threaded ProducerType/SINGLE}) (defnk disruptor-queue - [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] + [^String queue-name buffer-size timeout ^String storm-id ^Integer task-id ^String component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] (DisruptorQueue. queue-name (PRODUCER-TYPE producer-type) buffer-size - timeout batch-size batch-timeout storm-id component-id worker-port)) + timeout batch-size batch-timeout storm-id component-id task-id worker-port)) (defn clojure-handler [afn] http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index cfeb711..1a5bd45 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -46,8 +46,8 @@ public class StormMetricRegistry { private static String hostName = null; - public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer port){ - String metricName = metricName(name, topologyId, componentId, port); + public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port){ + String metricName = metricName(name, topologyId, componentId, taskId, port); if(REGISTRY.getGauges().containsKey(metricName)){ return (SimpleGauge)REGISTRY.getGauges().get(metricName); } else { @@ -55,16 +55,16 @@ public class StormMetricRegistry { } } - public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){ + public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port){ return new DisruptorMetrics( - StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port), - StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port), - StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port), - StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port), - StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port), - StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port), - StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port), - StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port) + StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, taskId, port), + StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, taskId, port) ); } @@ -147,7 +147,7 @@ public class StormMetricRegistry { return sb.toString(); } - public static String metricName(String name, String stormId, String componentId, Integer workerPort) { + public static String metricName(String name, String stormId, String componentId, Integer taskId, Integer workerPort) { StringBuilder sb = new StringBuilder("storm.worker."); sb.append(stormId); sb.append("."); @@ -155,6 +155,8 @@ public class StormMetricRegistry { sb.append("."); sb.append(dotToUnderScore(componentId)); sb.append("."); + sb.append(taskId); + sb.append("."); sb.append(workerPort); sb.append("-"); sb.append(name); http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index d7497d6..afa5158 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -418,7 +418,9 @@ public class DisruptorQueue implements IStatefulObject { private final AtomicLong tuplePopulation = new AtomicLong(0); private volatile boolean _throttleOn = false; - public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, int port) { + // [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer task-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] + + public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, Integer taskId, int port) { this._queueName = PREFIX + queueName; WaitStrategy wait; if (readTimeout <= 0) { @@ -432,7 +434,7 @@ public class DisruptorQueue implements IStatefulObject { _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); - _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port); + _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, taskId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java index ba2b507..15eb8c4 100644 --- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java +++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java @@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test",1000); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test",1000, 1000); } } http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java index 59de55d..65d627c 100644 --- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java +++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java @@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test", 1000); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test", 1000, 1000); } private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", "test", 1000); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", "test", 1000, 1000); } }
