STORM-1698 Asynchronous MetricsConsumerBolt

* change MetricsConsumerBolt's behavior to asynchronus manner
  * to avoid bad side effect of topology
* for details please refer JIRA issue: 
https://issues.apache.org/jira/browse/STORM-1698


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aee772b1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aee772b1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aee772b1

Branch: refs/heads/1.x-branch
Commit: aee772b15ec7a03878acf4dfcfda6bde4981f08a
Parents: a59007d
Author: Jungtaek Lim <[email protected]>
Authored: Fri Apr 8 13:05:00 2016 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Mon Jun 13 13:12:10 2016 +0900

----------------------------------------------------------------------
 .../storm/metric/MetricsConsumerBolt.java       | 56 +++++++++++++++++++-
 1 file changed, 54 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aee772b1/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java 
b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 1f3217f..0aeab34 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -23,15 +23,28 @@ import org.apache.storm.task.IBolt;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 public class MetricsConsumerBolt implements IBolt {
+    public static final Logger LOG = 
LoggerFactory.getLogger(MetricsConsumerBolt.class);
+
     IMetricsConsumer _metricsConsumer;
     String _consumerClassName;
     OutputCollector _collector;
     Object _registrationArgument;
 
+    private final BlockingQueue<MetricsTask> _taskQueue = new 
LinkedBlockingDeque<>();
+    private Thread _taskExecuteThread;
+    private volatile boolean _running = true;
+
     public MetricsConsumerBolt(String consumerClassName, Object 
registrationArgument) {
         _consumerClassName = consumerClassName;
         _registrationArgument = registrationArgument;
@@ -47,17 +60,56 @@ public class MetricsConsumerBolt implements IBolt {
         }
         _metricsConsumer.prepare(stormConf, _registrationArgument, context, 
collector);
         _collector = collector;
+        _taskExecuteThread = new Thread(new MetricsHandlerRunnable());
+        _taskExecuteThread.setDaemon(true);
+        _taskExecuteThread.start();
     }
     
     @Override
     public void execute(Tuple input) {
-        
_metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), 
(Collection)input.getValue(1));
+        _taskQueue.add(new 
MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), 
(Collection)input.getValue(1)));
         _collector.ack(input);
     }
 
     @Override
     public void cleanup() {
+        _running = false;
         _metricsConsumer.cleanup();
+        _taskExecuteThread.interrupt();
+    }
+
+    class MetricsTask {
+        private IMetricsConsumer.TaskInfo taskInfo;
+        private Collection<IMetricsConsumer.DataPoint> dataPoints;
+
+        public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, 
Collection<IMetricsConsumer.DataPoint> dataPoints) {
+            this.taskInfo = taskInfo;
+            this.dataPoints = dataPoints;
+        }
+
+        public IMetricsConsumer.TaskInfo getTaskInfo() {
+            return taskInfo;
+        }
+
+        public Collection<IMetricsConsumer.DataPoint> getDataPoints() {
+            return dataPoints;
+        }
+    }
+
+    class MetricsHandlerRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            while (_running) {
+                try {
+                    MetricsTask task = _taskQueue.take();
+                    _metricsConsumer.handleDataPoints(task.getTaskInfo(), 
task.getDataPoints());
+                } catch (InterruptedException e) {
+                    break;
+                } catch (Throwable t) {
+                    LOG.error("Exception occurred during handle metrics", t);
+                }
+            }
+        }
     }
-    
 }

Reply via email to