Repository: storm Updated Branches: refs/heads/1.x-branch a59007db5 -> 38c7ff866
STORM-1700 Introduce 'whitelist' / 'blacklist' option to MetricsConsumer * Users can set whitelist or blacklist to filter out metrics by name * if none of them specified (by default), no metrics are filtered out * how to match: substring match with regular expression * use both ^ and $ when you want to match strictly (full string match) * cache whether the metric name is filtered in or out * kinds of metrics are not changing during lifecycle of the topology * so applying regex for every metrics could be waste of CPU * added unit test Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5f4e37e8 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5f4e37e8 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5f4e37e8 Branch: refs/heads/1.x-branch Commit: 5f4e37e88e7797307edf2937831791d65bb53c06 Parents: ed8d12b Author: Jungtaek Lim <[email protected]> Authored: Sat Apr 9 14:11:20 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Mon Jun 13 13:12:10 2016 +0900 ---------------------------------------------------------------------- conf/storm.yaml.example | 9 ++ .../src/clj/org/apache/storm/daemon/common.clj | 17 +-- .../storm/metric/MetricsConsumerBolt.java | 47 +++++--- .../storm/metric/filter/FilterByMetricName.java | 111 +++++++++++++++++++ .../storm/metric/filter/MetricsFilter.java | 26 +++++ .../metric/filter/FilterByMetricNameTest.java | 95 ++++++++++++++++ 6 files changed, 284 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5f4e37e8/conf/storm.yaml.example ---------------------------------------------------------------------- diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example index 0e8b354..c6d9544 100644 --- a/conf/storm.yaml.example +++ b/conf/storm.yaml.example @@ -39,12 +39,21 @@ # - "server2" ## Metrics Consumers +## max.retain.metric.tuples +## - task queue will be unbounded when max.retain.metric.tuples is equal or less than 0. +## whitelist / blacklist +## - when none of configuration for metric filter are specified, it'll be treated as 'pass all'. +## - you need to specify either whitelist or blacklist, or none of them. You can't specify both of them. +## - you can specify multiple whitelist / blacklist with regular expression # topology.metrics.consumer.register: # - class: "org.apache.storm.metric.LoggingMetricsConsumer" # max.retain.metric.tuples: 100 # parallelism.hint: 1 # - class: "org.mycompany.MyMetricsConsumer" # max.retain.metric.tuples: 100 +# whitelist: +# - "execute.*" +# - "^__complete-latency$" # parallelism.hint: 1 # argument: # - endpoint: "metrics-collector.mycompany.org" http://git-wip-us.apache.org/repos/asf/storm/blob/5f4e37e8/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index 7fd19fe..68f58b0 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -25,9 +25,10 @@ (:import [org.apache.storm.task WorkerTopologyContext]) (:import [org.apache.storm Constants]) (:import [org.apache.storm.metric SystemBolt]) - (:import [org.apache.storm.metric EventLoggerBolt]) - (:import [org.apache.storm.security.auth IAuthorizer]) - (:import [java.io InterruptedIOException]) + (:import [org.apache.storm.metric EventLoggerBolt MetricsConsumerBolt]) + (:import [org.apache.storm.security.auth IAuthorizer]) + (:import [java.io InterruptedIOException] + (org.apache.storm.metric.filter FilterByMetricName)) (:require [clojure.set :as set]) (:require [org.apache.storm.daemon.acker :as acker]) (:require [org.apache.storm.thrift :as thrift]) @@ -298,10 +299,10 @@ {[comp-id METRICS-STREAM-ID] :shuffle}) (into {})) - mk-bolt-spec (fn [class arg p max-retain-metric-tuples] + mk-bolt-spec (fn [class arg p max-retain-metric-tuples whitelist blacklist] (thrift/mk-bolt-spec* inputs - (org.apache.storm.metric.MetricsConsumerBolt. class arg max-retain-metric-tuples) + (org.apache.storm.metric.MetricsConsumerBolt. class arg max-retain-metric-tuples (FilterByMetricName. whitelist blacklist)) {} :p p :conf {TOPOLOGY-TASKS p}))] (map @@ -309,8 +310,10 @@ [component-id (mk-bolt-spec (get register "class") (get register "argument") (or (get register "parallelism.hint") 1) - (or (get register "max.retain.metric.tuples") 100))]) - + (or (get register "max.retain.metric.tuples") 100) + (get register "whitelist") + (get register "blacklist"))]) + (metrics-consumer-register-ids storm-conf) (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) http://git-wip-us.apache.org/repos/asf/storm/blob/5f4e37e8/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java index 95f9137..6d4e844 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java +++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,9 @@ */ package org.apache.storm.metric; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.storm.Config; import org.apache.storm.metric.api.IMetricsConsumer; import org.apache.storm.task.IBolt; @@ -27,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -38,25 +42,34 @@ public class MetricsConsumerBolt implements IBolt { String _consumerClassName; OutputCollector _collector; Object _registrationArgument; + private Predicate<IMetricsConsumer.DataPoint> _filterPredicate; private final int _maxRetainMetricTuples; - private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>(); + private final BlockingQueue<MetricsTask> _taskQueue; private Thread _taskExecuteThread; private volatile boolean _running = true; - public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) { + public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples, + Predicate<IMetricsConsumer.DataPoint> filterPredicate) { _consumerClassName = consumerClassName; _registrationArgument = registrationArgument; _maxRetainMetricTuples = maxRetainMetricTuples; + _filterPredicate = filterPredicate; + + if (_maxRetainMetricTuples > 0) { + _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples); + } else { + _taskQueue = new LinkedBlockingDeque<>(); + } } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { try { - _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance(); + _metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance(); } catch (Exception e) { throw new RuntimeException("Could not instantiate a class listed in config under section " + - Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); } _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector); _collector = collector; @@ -64,20 +77,25 @@ public class MetricsConsumerBolt implements IBolt { _taskExecuteThread.setDaemon(true); _taskExecuteThread.start(); } - + @Override public void execute(Tuple input) { - // remove older tasks if task queue exceeds the max size - if (_taskQueue.size() > _maxRetainMetricTuples) { - while (_taskQueue.size() - 1 > _maxRetainMetricTuples) { - _taskQueue.poll(); - } + IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0); + Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1); + List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(dataPoints); + MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints); + + while (!_taskQueue.offer(metricsTask)) { + _taskQueue.poll(); } - _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1))); _collector.ack(input); } + private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) { + return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate)); + } + @Override public void cleanup() { _running = false; @@ -85,7 +103,7 @@ public class MetricsConsumerBolt implements IBolt { _taskExecuteThread.interrupt(); } - class MetricsTask { + static class MetricsTask { private IMetricsConsumer.TaskInfo taskInfo; private Collection<IMetricsConsumer.DataPoint> dataPoints; @@ -119,4 +137,5 @@ public class MetricsConsumerBolt implements IBolt { } } } + } http://git-wip-us.apache.org/repos/asf/storm/blob/5f4e37e8/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java new file mode 100644 index 0000000..251d377 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.base.Function; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.storm.metric.api.IMetricsConsumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; + +public class FilterByMetricName implements MetricsFilter { + private final Cache<String, Boolean> filterCache; + private final List<Pattern> whitelistPattern; + private final List<Pattern> blacklistPattern; + private boolean noneSpecified = false; + + public FilterByMetricName(List<String> whitelistPattern, List<String> blacklistPattern) { + // guard NPE + if (whitelistPattern == null) { + this.whitelistPattern = Collections.emptyList(); + } else { + this.whitelistPattern = convertPatternStringsToPatternInstances(whitelistPattern); + } + + // guard NPE + if (blacklistPattern == null) { + this.blacklistPattern = Collections.emptyList(); + } else { + this.blacklistPattern = convertPatternStringsToPatternInstances(blacklistPattern); + } + + if (this.whitelistPattern.isEmpty() && this.blacklistPattern.isEmpty()) { + noneSpecified = true; + } else if (!this.whitelistPattern.isEmpty() && !this.blacklistPattern.isEmpty()) { + throw new IllegalArgumentException("You have to specify either includes or excludes, or none."); + } + + filterCache = CacheBuilder.newBuilder() + .maximumSize(1000) + .build(); + } + + @Override + public boolean apply(IMetricsConsumer.DataPoint dataPoint) { + if (noneSpecified) { + return true; + } + + String metricName = dataPoint.name; + + Boolean cachedFilteredIn = filterCache.getIfPresent(metricName); + if (cachedFilteredIn != null) { + return cachedFilteredIn; + } else { + boolean filteredIn = isFilteredIn(metricName); + filterCache.put(metricName, filteredIn); + return filteredIn; + } + } + + private ArrayList<Pattern> convertPatternStringsToPatternInstances(List<String> patterns) { + return Lists.newArrayList(Iterators.transform(patterns.iterator(), new Function<String, Pattern>() { + @Override + public Pattern apply(String s) { + return Pattern.compile(s); + } + })); + } + + private boolean isFilteredIn(String metricName) { + + if (!whitelistPattern.isEmpty()) { + return checkMatching(metricName, whitelistPattern, true); + } else if (!blacklistPattern.isEmpty()) { + return checkMatching(metricName, blacklistPattern, false); + } + + throw new IllegalStateException("Shouldn't reach here"); + } + + private boolean checkMatching(String metricName, List<Pattern> patterns, boolean valueWhenMatched) { + for (Pattern pattern : patterns) { + if (pattern.matcher(metricName).find()) { + return valueWhenMatched; + } + } + + return !valueWhenMatched; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/5f4e37e8/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java new file mode 100644 index 0000000..f12f706 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.base.Predicate; +import org.apache.storm.metric.api.IMetricsConsumer; + +import java.io.Serializable; + +public interface MetricsFilter extends Predicate<IMetricsConsumer.DataPoint>, Serializable { +} http://git-wip-us.apache.org/repos/asf/storm/blob/5f4e37e8/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java new file mode 100644 index 0000000..d2f11d6 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.junit.Assert.*; + +public class FilterByMetricNameTest { + @Before + public void setUp() throws Exception { + + } + + @Test + public void testWhitelist() { + List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+"); + FilterByMetricName sut = new FilterByMetricName(whitelistPattern, null); + + Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap(); + testMetricNamesAndExpected.put("storm.metric.hello", false); + testMetricNamesAndExpected.put("test.hello.world", false); + testMetricNamesAndExpected.put("test.hello.123", true); + testMetricNamesAndExpected.put("test.metric.world", false); + testMetricNamesAndExpected.put("metric.world", true); + + assertTests(sut, testMetricNamesAndExpected); + } + + @Test + public void testBlacklist() { + List<String> blacklistPattern = Lists.newArrayList("^__", "test\\."); + FilterByMetricName sut = new FilterByMetricName(null, blacklistPattern); + + Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap(); + testMetricNamesAndExpected.put("__storm.metric.hello", false); + testMetricNamesAndExpected.put("storm.metric.__hello", true); + testMetricNamesAndExpected.put("test.hello.world", false); + testMetricNamesAndExpected.put("storm.test.123", false); + testMetricNamesAndExpected.put("metric.world", true); + + assertTests(sut, testMetricNamesAndExpected); + } + + @Test(expected = IllegalArgumentException.class) + public void testBothWhitelistAndBlacklistAreSpecified() { + List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+"); + List<String> blacklistPattern = Lists.newArrayList("^__", "test\\."); + new FilterByMetricName(whitelistPattern, blacklistPattern); + } + + @Test + public void testNoneIsSpecified() { + FilterByMetricName sut = new FilterByMetricName(null, null); + + Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap(); + testMetricNamesAndExpected.put("__storm.metric.hello", true); + testMetricNamesAndExpected.put("storm.metric.__hello", true); + testMetricNamesAndExpected.put("test.hello.world", true); + testMetricNamesAndExpected.put("storm.test.123", true); + testMetricNamesAndExpected.put("metric.world", true); + + assertTests(sut, testMetricNamesAndExpected); + } + + private void assertTests(FilterByMetricName sut, Map<String, Boolean> testMetricNamesAndExpected) { + for (Map.Entry<String, Boolean> testEntry : testMetricNamesAndExpected.entrySet()) { + assertEquals("actual filter result is not same: " + testEntry.getKey(), + testEntry.getValue(), sut.apply(new IMetricsConsumer.DataPoint(testEntry.getKey(), 1))); + } + } +} \ No newline at end of file
