STORM-1698 Asynchronous MetricsConsumerBolt * change MetricsConsumerBolt's behavior to asynchronus manner * to avoid bad side effect of topology * for details please refer JIRA issue: https://issues.apache.org/jira/browse/STORM-1698
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aee772b1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aee772b1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aee772b1 Branch: refs/heads/1.x-branch Commit: aee772b15ec7a03878acf4dfcfda6bde4981f08a Parents: a59007d Author: Jungtaek Lim <[email protected]> Authored: Fri Apr 8 13:05:00 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Mon Jun 13 13:12:10 2016 +0900 ---------------------------------------------------------------------- .../storm/metric/MetricsConsumerBolt.java | 56 +++++++++++++++++++- 1 file changed, 54 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/aee772b1/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java index 1f3217f..0aeab34 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java +++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java @@ -23,15 +23,28 @@ import org.apache.storm.task.IBolt; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; import java.util.Collection; import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; public class MetricsConsumerBolt implements IBolt { + public static final Logger LOG = LoggerFactory.getLogger(MetricsConsumerBolt.class); + IMetricsConsumer _metricsConsumer; String _consumerClassName; OutputCollector _collector; Object _registrationArgument; + private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>(); + private Thread _taskExecuteThread; + private volatile boolean _running = true; + public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) { _consumerClassName = consumerClassName; _registrationArgument = registrationArgument; @@ -47,17 +60,56 @@ public class MetricsConsumerBolt implements IBolt { } _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector); _collector = collector; + _taskExecuteThread = new Thread(new MetricsHandlerRunnable()); + _taskExecuteThread.setDaemon(true); + _taskExecuteThread.start(); } @Override public void execute(Tuple input) { - _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)); + _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1))); _collector.ack(input); } @Override public void cleanup() { + _running = false; _metricsConsumer.cleanup(); + _taskExecuteThread.interrupt(); + } + + class MetricsTask { + private IMetricsConsumer.TaskInfo taskInfo; + private Collection<IMetricsConsumer.DataPoint> dataPoints; + + public MetricsTask(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints) { + this.taskInfo = taskInfo; + this.dataPoints = dataPoints; + } + + public IMetricsConsumer.TaskInfo getTaskInfo() { + return taskInfo; + } + + public Collection<IMetricsConsumer.DataPoint> getDataPoints() { + return dataPoints; + } + } + + class MetricsHandlerRunnable implements Runnable { + + @Override + public void run() { + while (_running) { + try { + MetricsTask task = _taskQueue.take(); + _metricsConsumer.handleDataPoints(task.getTaskInfo(), task.getDataPoints()); + } catch (InterruptedException e) { + break; + } catch (Throwable t) { + LOG.error("Exception occurred during handle metrics", t); + } + } + } } - }
