STORM-1698 Asynchronous MetricsConsumerBolt * add new option "max.retain.metrics.tuples" to topology metrics consumer * if count of pending tasks exceed this value, older metric tuples are discarded
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ed8d12b9 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ed8d12b9 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ed8d12b9 Branch: refs/heads/1.x-branch Commit: ed8d12b97f6ed3674db0b2329153f4d77db94f74 Parents: aee772b Author: Jungtaek Lim <[email protected]> Authored: Tue May 3 15:42:10 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Mon Jun 13 13:12:10 2016 +0900 ---------------------------------------------------------------------- conf/storm.yaml.example | 2 ++ storm-core/src/clj/org/apache/storm/daemon/common.clj | 7 ++++--- .../org/apache/storm/metric/MetricsConsumerBolt.java | 13 ++++++++++--- 3 files changed, 16 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ed8d12b9/conf/storm.yaml.example ---------------------------------------------------------------------- diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example index 7df3e9d..0e8b354 100644 --- a/conf/storm.yaml.example +++ b/conf/storm.yaml.example @@ -41,8 +41,10 @@ ## Metrics Consumers # topology.metrics.consumer.register: # - class: "org.apache.storm.metric.LoggingMetricsConsumer" +# max.retain.metric.tuples: 100 # parallelism.hint: 1 # - class: "org.mycompany.MyMetricsConsumer" +# max.retain.metric.tuples: 100 # parallelism.hint: 1 # argument: # - endpoint: "metrics-collector.mycompany.org" http://git-wip-us.apache.org/repos/asf/storm/blob/ed8d12b9/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index 55bc030..7fd19fe 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -298,17 +298,18 @@ {[comp-id METRICS-STREAM-ID] :shuffle}) (into {})) - mk-bolt-spec (fn [class arg p] + mk-bolt-spec (fn [class arg p max-retain-metric-tuples] (thrift/mk-bolt-spec* inputs - (org.apache.storm.metric.MetricsConsumerBolt. class arg) + (org.apache.storm.metric.MetricsConsumerBolt. class arg max-retain-metric-tuples) {} :p p :conf {TOPOLOGY-TASKS p}))] (map (fn [component-id register] [component-id (mk-bolt-spec (get register "class") (get register "argument") - (or (get register "parallelism.hint") 1))]) + (or (get register "parallelism.hint") 1) + (or (get register "max.retain.metric.tuples") 100))]) (metrics-consumer-register-ids storm-conf) (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) http://git-wip-us.apache.org/repos/asf/storm/blob/ed8d12b9/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java index 0aeab34..95f9137 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java +++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java @@ -26,11 +26,9 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; public class MetricsConsumerBolt implements IBolt { @@ -40,14 +38,16 @@ public class MetricsConsumerBolt implements IBolt { String _consumerClassName; OutputCollector _collector; Object _registrationArgument; + private final int _maxRetainMetricTuples; private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>(); private Thread _taskExecuteThread; private volatile boolean _running = true; - public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) { + public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) { _consumerClassName = consumerClassName; _registrationArgument = registrationArgument; + _maxRetainMetricTuples = maxRetainMetricTuples; } @Override @@ -67,6 +67,13 @@ public class MetricsConsumerBolt implements IBolt { @Override public void execute(Tuple input) { + // remove older tasks if task queue exceeds the max size + if (_taskQueue.size() > _maxRetainMetricTuples) { + while (_taskQueue.size() - 1 > _maxRetainMetricTuples) { + _taskQueue.poll(); + } + } + _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1))); _collector.ack(input); }
