STORM-1698 Asynchronous MetricsConsumerBolt * add new option "max.retain.metrics.tuples" to topology metrics consumer * if count of pending tasks exceed this value, older metric tuples are discarded
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dd6de9c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dd6de9c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dd6de9c Branch: refs/heads/master Commit: 4dd6de9c494df112d668d66d8c10288becc54495 Parents: 79be212 Author: Jungtaek Lim <[email protected]> Authored: Tue May 3 15:12:38 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Mon Jun 13 13:06:26 2016 +0900 ---------------------------------------------------------------------- conf/storm.yaml.example | 2 ++ .../src/jvm/org/apache/storm/daemon/StormCommon.java | 4 +++- .../org/apache/storm/metric/MetricsConsumerBolt.java | 13 ++++++++++--- 3 files changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/conf/storm.yaml.example ---------------------------------------------------------------------- diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example index 7df3e9d..0e8b354 100644 --- a/conf/storm.yaml.example +++ b/conf/storm.yaml.example @@ -41,8 +41,10 @@ ## Metrics Consumers # topology.metrics.consumer.register: # - class: "org.apache.storm.metric.LoggingMetricsConsumer" +# max.retain.metric.tuples: 100 # parallelism.hint: 1 # - class: "org.mycompany.MyMetricsConsumer" +# max.retain.metric.tuples: 100 # parallelism.hint: 1 # argument: # - endpoint: "metrics-collector.mycompany.org" http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java index 7792052..0dbb9f2 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java @@ -383,10 +383,12 @@ public class StormCommon { for (Map<String, Object> info : registerInfo) { String className = (String) info.get("class"); Object argument = info.get("argument"); + Integer maxRetainMetricTuples = Utils.getInt(info.get("max.retain.metric.tuples"), 100); Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1); Map<String, Object> metricsConsumerConf = new HashMap<String, Object>(); metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum); - Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, argument), null, phintNum, metricsConsumerConf); + Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, + new MetricsConsumerBolt(className, argument, maxRetainMetricTuples), null, phintNum, metricsConsumerConf); String id = className; if (classOccurrencesMap.containsKey(className)) { http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java index 0aeab34..95f9137 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java +++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java @@ -26,11 +26,9 @@ import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; public class MetricsConsumerBolt implements IBolt { @@ -40,14 +38,16 @@ public class MetricsConsumerBolt implements IBolt { String _consumerClassName; OutputCollector _collector; Object _registrationArgument; + private final int _maxRetainMetricTuples; private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>(); private Thread _taskExecuteThread; private volatile boolean _running = true; - public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) { + public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) { _consumerClassName = consumerClassName; _registrationArgument = registrationArgument; + _maxRetainMetricTuples = maxRetainMetricTuples; } @Override @@ -67,6 +67,13 @@ public class MetricsConsumerBolt implements IBolt { @Override public void execute(Tuple input) { + // remove older tasks if task queue exceeds the max size + if (_taskQueue.size() > _maxRetainMetricTuples) { + while (_taskQueue.size() - 1 > _maxRetainMetricTuples) { + _taskQueue.poll(); + } + } + _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1))); _collector.ack(input); }
