[
https://issues.apache.org/jira/browse/STORM-1700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15270424#comment-15270424
]
ASF GitHub Bot commented on STORM-1700:
---------------------------------------
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/1324#discussion_r62015528
--- Diff:
storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java ---
@@ -47,17 +68,71 @@ public void prepare(Map stormConf, TopologyContext
context, OutputCollector coll
}
_metricsConsumer.prepare(stormConf, _registrationArgument,
context, collector);
_collector = collector;
+ _taskExecuteThread = new Thread(new MetricsHandlerRunnable());
+ _taskExecuteThread.setDaemon(true);
+ _taskExecuteThread.start();
}
@Override
public void execute(Tuple input) {
-
_metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0),
(Collection)input.getValue(1));
+ // remove older tasks if task queue exceeds the max size
+ if (_taskQueue.size() > _maxRetainMetricTuples) {
+ while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
+ _taskQueue.poll();
+ }
+ }
+
+ IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo)
input.getValue(0);
+ Collection<IMetricsConsumer.DataPoint> dataPoints =
(Collection)input.getValue(1);
+ List<IMetricsConsumer.DataPoint> filteredDataPoints =
getFilteredDataPoints(dataPoints);
+ _taskQueue.add(new MetricsTask(taskInfo, filteredDataPoints));
_collector.ack(input);
}
+ private List<IMetricsConsumer.DataPoint>
getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
+ return Lists.newArrayList(Iterables.filter(dataPoints,
_filterPredicate));
+ }
+
@Override
public void cleanup() {
+ _running = false;
_metricsConsumer.cleanup();
+ _taskExecuteThread.interrupt();
}
-
+
+ class MetricsTask {
--- End diff --
Yes I think that would be clearer. I'll address it.
> Introduce 'whitelist' / 'blacklist' option to MetricsConsumer
> -------------------------------------------------------------
>
> Key: STORM-1700
> URL: https://issues.apache.org/jira/browse/STORM-1700
> Project: Apache Storm
> Issue Type: Sub-task
> Components: storm-core
> Affects Versions: 1.0.0, 2.0.0
> Reporter: Jungtaek Lim
> Assignee: Jungtaek Lim
>
> Storm provides various metrics by default, and so on some external modules
> (storm-kafka).
> When we register MetricsConsumer, MetricsConsumer should handle all of
> metrics. If MetricsConsumer cannot keep up with these metrics, only way to
> keep up is increasing parallelism, which seems limited. Furthermore, some
> users don't want to care about some metrics since unintended metrics will
> fill external storage.
> Though MetricsConsumer itself can filter metrics by name, it would be better
> to support filter by Storm side. It will reduce the redundant works for Storm
> community.
> If we provide filter options, it would be great.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)