STORM-1698 Asynchronous MetricsConsumerBolt

* add new option "max.retain.metrics.tuples" to topology metrics consumer
  * if count of pending tasks exceed this value, older metric tuples are 
discarded


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ed8d12b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ed8d12b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ed8d12b9

Branch: refs/heads/1.x-branch
Commit: ed8d12b97f6ed3674db0b2329153f4d77db94f74
Parents: aee772b
Author: Jungtaek Lim <[email protected]>
Authored: Tue May 3 15:42:10 2016 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Mon Jun 13 13:12:10 2016 +0900

----------------------------------------------------------------------
 conf/storm.yaml.example                                |  2 ++
 storm-core/src/clj/org/apache/storm/daemon/common.clj  |  7 ++++---
 .../org/apache/storm/metric/MetricsConsumerBolt.java   | 13 ++++++++++---
 3 files changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ed8d12b9/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 7df3e9d..0e8b354 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -41,8 +41,10 @@
 ## Metrics Consumers
 # topology.metrics.consumer.register:
 #   - class: "org.apache.storm.metric.LoggingMetricsConsumer"
+#     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #   - class: "org.mycompany.MyMetricsConsumer"
+#     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #     argument:
 #       - endpoint: "metrics-collector.mycompany.org"

http://git-wip-us.apache.org/repos/asf/storm/blob/ed8d12b9/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj 
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 55bc030..7fd19fe 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -298,17 +298,18 @@
                       {[comp-id METRICS-STREAM-ID] :shuffle})
                     (into {}))
         
-        mk-bolt-spec (fn [class arg p]
+        mk-bolt-spec (fn [class arg p max-retain-metric-tuples]
                        (thrift/mk-bolt-spec*
                         inputs
-                        (org.apache.storm.metric.MetricsConsumerBolt. class 
arg)
+                        (org.apache.storm.metric.MetricsConsumerBolt. class 
arg max-retain-metric-tuples)
                         {} :p p :conf {TOPOLOGY-TASKS p}))]
     
     (map
      (fn [component-id register]           
        [component-id (mk-bolt-spec (get register "class")
                                    (get register "argument")
-                                   (or (get register "parallelism.hint") 1))])
+                                   (or (get register "parallelism.hint") 1)
+                                   (or (get register 
"max.retain.metric.tuples") 100))])
      
      (metrics-consumer-register-ids storm-conf)
      (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))

http://git-wip-us.apache.org/repos/asf/storm/blob/ed8d12b9/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java 
b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 0aeab34..95f9137 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -26,11 +26,9 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
 public class MetricsConsumerBolt implements IBolt {
@@ -40,14 +38,16 @@ public class MetricsConsumerBolt implements IBolt {
     String _consumerClassName;
     OutputCollector _collector;
     Object _registrationArgument;
+    private final int _maxRetainMetricTuples;
 
     private final BlockingQueue<MetricsTask> _taskQueue = new 
LinkedBlockingDeque<>();
     private Thread _taskExecuteThread;
     private volatile boolean _running = true;
 
-    public MetricsConsumerBolt(String consumerClassName, Object 
registrationArgument) {
+    public MetricsConsumerBolt(String consumerClassName, Object 
registrationArgument, int maxRetainMetricTuples) {
         _consumerClassName = consumerClassName;
         _registrationArgument = registrationArgument;
+        _maxRetainMetricTuples = maxRetainMetricTuples;
     }
 
     @Override
@@ -67,6 +67,13 @@ public class MetricsConsumerBolt implements IBolt {
     
     @Override
     public void execute(Tuple input) {
+        // remove older tasks if task queue exceeds the max size
+        if (_taskQueue.size() > _maxRetainMetricTuples) {
+            while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
+                _taskQueue.poll();
+            }
+        }
+
         _taskQueue.add(new 
MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), 
(Collection)input.getValue(1)));
         _collector.ack(input);
     }

Reply via email to