STORM-1698 Asynchronous MetricsConsumerBolt

* add new option "max.retain.metrics.tuples" to topology metrics consumer
  * if count of pending tasks exceed this value, older metric tuples are 
discarded


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4dd6de9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4dd6de9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4dd6de9c

Branch: refs/heads/master
Commit: 4dd6de9c494df112d668d66d8c10288becc54495
Parents: 79be212
Author: Jungtaek Lim <[email protected]>
Authored: Tue May 3 15:12:38 2016 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Mon Jun 13 13:06:26 2016 +0900

----------------------------------------------------------------------
 conf/storm.yaml.example                                |  2 ++
 .../src/jvm/org/apache/storm/daemon/StormCommon.java   |  4 +++-
 .../org/apache/storm/metric/MetricsConsumerBolt.java   | 13 ++++++++++---
 3 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 7df3e9d..0e8b354 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -41,8 +41,10 @@
 ## Metrics Consumers
 # topology.metrics.consumer.register:
 #   - class: "org.apache.storm.metric.LoggingMetricsConsumer"
+#     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #   - class: "org.mycompany.MyMetricsConsumer"
+#     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #     argument:
 #       - endpoint: "metrics-collector.mycompany.org"

http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java 
b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 7792052..0dbb9f2 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -383,10 +383,12 @@ public class StormCommon {
             for (Map<String, Object> info : registerInfo) {
                 String className = (String) info.get("class");
                 Object argument = info.get("argument");
+                Integer maxRetainMetricTuples = 
Utils.getInt(info.get("max.retain.metric.tuples"), 100);
                 Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 
1);
                 Map<String, Object> metricsConsumerConf = new HashMap<String, 
Object>();
                 metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
-                Bolt metricsConsumerBolt = 
Thrift.prepareSerializedBoltDetails(inputs, new MetricsConsumerBolt(className, 
argument), null, phintNum, metricsConsumerConf);
+                Bolt metricsConsumerBolt = 
Thrift.prepareSerializedBoltDetails(inputs,
+                        new MetricsConsumerBolt(className, argument, 
maxRetainMetricTuples), null, phintNum, metricsConsumerConf);
 
                 String id = className;
                 if (classOccurrencesMap.containsKey(className)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/4dd6de9c/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java 
b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 0aeab34..95f9137 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -26,11 +26,9 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
 public class MetricsConsumerBolt implements IBolt {
@@ -40,14 +38,16 @@ public class MetricsConsumerBolt implements IBolt {
     String _consumerClassName;
     OutputCollector _collector;
     Object _registrationArgument;
+    private final int _maxRetainMetricTuples;
 
     private final BlockingQueue<MetricsTask> _taskQueue = new 
LinkedBlockingDeque<>();
     private Thread _taskExecuteThread;
     private volatile boolean _running = true;
 
-    public MetricsConsumerBolt(String consumerClassName, Object 
registrationArgument) {
+    public MetricsConsumerBolt(String consumerClassName, Object 
registrationArgument, int maxRetainMetricTuples) {
         _consumerClassName = consumerClassName;
         _registrationArgument = registrationArgument;
+        _maxRetainMetricTuples = maxRetainMetricTuples;
     }
 
     @Override
@@ -67,6 +67,13 @@ public class MetricsConsumerBolt implements IBolt {
     
     @Override
     public void execute(Tuple input) {
+        // remove older tasks if task queue exceeds the max size
+        if (_taskQueue.size() > _maxRetainMetricTuples) {
+            while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
+                _taskQueue.poll();
+            }
+        }
+
         _taskQueue.add(new 
MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), 
(Collection)input.getValue(1)));
         _collector.ack(input);
     }

Reply via email to