Repository: storm Updated Branches: refs/heads/master ded5a0df1 -> d42276f6f
STORM-1700 Introduce 'whitelist' / 'blacklist' option to MetricsConsumer * Users can set whitelist or blacklist to filter out metrics by name * if none of them specified (by default), no metrics are filtered out * how to match: substring match with regular expression * use both ^ and $ when you want to match strictly (full string match) * cache whether the metric name is filtered in or out * kinds of metrics are not changing during lifecycle of the topology * so applying regex for every metrics could be waste of CPU * added unit test Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/565d53dc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/565d53dc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/565d53dc Branch: refs/heads/master Commit: 565d53dc655a4ffa5626b81658379cdf96a13fff Parents: 4dd6de9 Author: Jungtaek Lim <[email protected]> Authored: Sat Apr 9 14:11:20 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Mon Jun 13 13:06:26 2016 +0900 ---------------------------------------------------------------------- conf/storm.yaml.example | 9 ++ .../org/apache/storm/daemon/StormCommon.java | 7 +- .../storm/metric/MetricsConsumerBolt.java | 38 +++++-- .../storm/metric/filter/FilterByMetricName.java | 110 +++++++++++++++++++ .../storm/metric/filter/MetricsFilter.java | 26 +++++ .../metric/filter/FilterByMetricNameTest.java | 95 ++++++++++++++++ 6 files changed, 275 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/conf/storm.yaml.example ---------------------------------------------------------------------- diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example index 0e8b354..c6d9544 100644 --- a/conf/storm.yaml.example +++ b/conf/storm.yaml.example @@ -39,12 +39,21 @@ # - "server2" ## Metrics Consumers +## max.retain.metric.tuples +## - task queue will be unbounded when max.retain.metric.tuples is equal or less than 0. +## whitelist / blacklist +## - when none of configuration for metric filter are specified, it'll be treated as 'pass all'. +## - you need to specify either whitelist or blacklist, or none of them. You can't specify both of them. +## - you can specify multiple whitelist / blacklist with regular expression # topology.metrics.consumer.register: # - class: "org.apache.storm.metric.LoggingMetricsConsumer" # max.retain.metric.tuples: 100 # parallelism.hint: 1 # - class: "org.mycompany.MyMetricsConsumer" # max.retain.metric.tuples: 100 +# whitelist: +# - "execute.*" +# - "^__complete-latency$" # parallelism.hint: 1 # argument: # - endpoint: "metrics-collector.mycompany.org" http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java index 0dbb9f2..5097167 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java @@ -38,6 +38,7 @@ import org.apache.storm.generated.StreamInfo; import org.apache.storm.metric.EventLoggerBolt; import org.apache.storm.metric.MetricsConsumerBolt; import org.apache.storm.metric.SystemBolt; +import org.apache.storm.metric.filter.FilterByMetricName; import org.apache.storm.security.auth.IAuthorizer; import org.apache.storm.task.IBolt; import org.apache.storm.testing.NonRichBoltTracker; @@ -387,8 +388,12 @@ public class StormCommon { Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 1); Map<String, Object> metricsConsumerConf = new HashMap<String, Object>(); metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum); + List<String> whitelist = (List<String>) info.get("whitelist"); + List<String> blacklist = (List<String>) info.get("blacklist"); + FilterByMetricName filterPredicate = new FilterByMetricName(whitelist, blacklist); Bolt metricsConsumerBolt = Thrift.prepareSerializedBoltDetails(inputs, - new MetricsConsumerBolt(className, argument, maxRetainMetricTuples), null, phintNum, metricsConsumerConf); + new MetricsConsumerBolt(className, argument, maxRetainMetricTuples, filterPredicate), + null, phintNum, metricsConsumerConf); String id = className; if (classOccurrencesMap.containsKey(className)) { http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java index 95f9137..16a253c 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java +++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java @@ -17,6 +17,9 @@ */ package org.apache.storm.metric; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.storm.Config; import org.apache.storm.metric.api.IMetricsConsumer; import org.apache.storm.task.IBolt; @@ -27,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; @@ -39,15 +43,25 @@ public class MetricsConsumerBolt implements IBolt { OutputCollector _collector; Object _registrationArgument; private final int _maxRetainMetricTuples; + private Predicate<IMetricsConsumer.DataPoint> _filterPredicate; - private final BlockingQueue<MetricsTask> _taskQueue = new LinkedBlockingDeque<>(); + private final BlockingQueue<MetricsTask> _taskQueue; private Thread _taskExecuteThread; private volatile boolean _running = true; - public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples) { + public MetricsConsumerBolt(String consumerClassName, Object registrationArgument, int maxRetainMetricTuples, + Predicate<IMetricsConsumer.DataPoint> filterPredicate) { + _consumerClassName = consumerClassName; _registrationArgument = registrationArgument; _maxRetainMetricTuples = maxRetainMetricTuples; + _filterPredicate = filterPredicate; + + if (_maxRetainMetricTuples > 0) { + _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples); + } else { + _taskQueue = new LinkedBlockingDeque<>(); + } } @Override @@ -67,17 +81,22 @@ public class MetricsConsumerBolt implements IBolt { @Override public void execute(Tuple input) { - // remove older tasks if task queue exceeds the max size - if (_taskQueue.size() > _maxRetainMetricTuples) { - while (_taskQueue.size() - 1 > _maxRetainMetricTuples) { - _taskQueue.poll(); - } + IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) input.getValue(0); + Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) input.getValue(1); + List<IMetricsConsumer.DataPoint> filteredDataPoints = getFilteredDataPoints(dataPoints); + MetricsTask metricsTask = new MetricsTask(taskInfo, filteredDataPoints); + + while (! _taskQueue.offer(metricsTask)) { + _taskQueue.poll(); } - _taskQueue.add(new MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1))); _collector.ack(input); } + private List<IMetricsConsumer.DataPoint> getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) { + return Lists.newArrayList(Iterables.filter(dataPoints, _filterPredicate)); + } + @Override public void cleanup() { _running = false; @@ -85,7 +104,7 @@ public class MetricsConsumerBolt implements IBolt { _taskExecuteThread.interrupt(); } - class MetricsTask { + static class MetricsTask { private IMetricsConsumer.TaskInfo taskInfo; private Collection<IMetricsConsumer.DataPoint> dataPoints; @@ -119,4 +138,5 @@ public class MetricsConsumerBolt implements IBolt { } } } + } http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java new file mode 100644 index 0000000..9c95428 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.base.Function; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.storm.metric.api.IMetricsConsumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; + +public class FilterByMetricName implements MetricsFilter { + private final Cache<String, Boolean> filterCache; + private final List<Pattern> whitelistPattern; + private final List<Pattern> blacklistPattern; + private boolean noneSpecified = false; + + public FilterByMetricName(List<String> whitelistPattern, List<String> blacklistPattern) { + // guard NPE + if (whitelistPattern == null) { + this.whitelistPattern = Collections.emptyList(); + } else { + this.whitelistPattern = convertPatternStringsToPatternInstances(whitelistPattern); + } + + // guard NPE + if (blacklistPattern == null) { + this.blacklistPattern = Collections.emptyList(); + } else { + this.blacklistPattern = convertPatternStringsToPatternInstances(blacklistPattern); + } + + if (this.whitelistPattern.isEmpty() && this.blacklistPattern.isEmpty()) { + noneSpecified = true; + } else if (!this.whitelistPattern.isEmpty() && !this.blacklistPattern.isEmpty()) { + throw new IllegalArgumentException("You have to specify either includes or excludes, or none."); + } + + filterCache = CacheBuilder.newBuilder() + .maximumSize(1000) + .build(); + } + + @Override + public boolean apply(IMetricsConsumer.DataPoint dataPoint) { + if (noneSpecified) { + return true; + } + + String metricName = dataPoint.name; + + Boolean cachedFilteredIn = filterCache.getIfPresent(metricName); + if (cachedFilteredIn != null) { + return cachedFilteredIn; + } else { + boolean filteredIn = isFilteredIn(metricName); + filterCache.put(metricName, filteredIn); + return filteredIn; + } + } + + private ArrayList<Pattern> convertPatternStringsToPatternInstances(List<String> patterns) { + return Lists.newArrayList(Iterators.transform(patterns.iterator(), new Function<String, Pattern>() { + @Override + public Pattern apply(String s) { + return Pattern.compile(s); + } + })); + } + + private boolean isFilteredIn(String metricName) { + if (!whitelistPattern.isEmpty()) { + return checkMatching(metricName, whitelistPattern, true); + } else if (!blacklistPattern.isEmpty()) { + return checkMatching(metricName, blacklistPattern, false); + } + + throw new IllegalStateException("Shouldn't reach here"); + } + + private boolean checkMatching(String metricName, List<Pattern> patterns, boolean valueWhenMatched) { + for (Pattern pattern : patterns) { + if (pattern.matcher(metricName).find()) { + return valueWhenMatched; + } + } + + return !valueWhenMatched; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java new file mode 100644 index 0000000..f12f706 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.base.Predicate; +import org.apache.storm.metric.api.IMetricsConsumer; + +import java.io.Serializable; + +public interface MetricsFilter extends Predicate<IMetricsConsumer.DataPoint>, Serializable { +} http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java new file mode 100644 index 0000000..d2f11d6 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.junit.Assert.*; + +public class FilterByMetricNameTest { + @Before + public void setUp() throws Exception { + + } + + @Test + public void testWhitelist() { + List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+"); + FilterByMetricName sut = new FilterByMetricName(whitelistPattern, null); + + Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap(); + testMetricNamesAndExpected.put("storm.metric.hello", false); + testMetricNamesAndExpected.put("test.hello.world", false); + testMetricNamesAndExpected.put("test.hello.123", true); + testMetricNamesAndExpected.put("test.metric.world", false); + testMetricNamesAndExpected.put("metric.world", true); + + assertTests(sut, testMetricNamesAndExpected); + } + + @Test + public void testBlacklist() { + List<String> blacklistPattern = Lists.newArrayList("^__", "test\\."); + FilterByMetricName sut = new FilterByMetricName(null, blacklistPattern); + + Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap(); + testMetricNamesAndExpected.put("__storm.metric.hello", false); + testMetricNamesAndExpected.put("storm.metric.__hello", true); + testMetricNamesAndExpected.put("test.hello.world", false); + testMetricNamesAndExpected.put("storm.test.123", false); + testMetricNamesAndExpected.put("metric.world", true); + + assertTests(sut, testMetricNamesAndExpected); + } + + @Test(expected = IllegalArgumentException.class) + public void testBothWhitelistAndBlacklistAreSpecified() { + List<String> whitelistPattern = Lists.newArrayList("^metric\\.", "test\\.hello\\.[0-9]+"); + List<String> blacklistPattern = Lists.newArrayList("^__", "test\\."); + new FilterByMetricName(whitelistPattern, blacklistPattern); + } + + @Test + public void testNoneIsSpecified() { + FilterByMetricName sut = new FilterByMetricName(null, null); + + Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap(); + testMetricNamesAndExpected.put("__storm.metric.hello", true); + testMetricNamesAndExpected.put("storm.metric.__hello", true); + testMetricNamesAndExpected.put("test.hello.world", true); + testMetricNamesAndExpected.put("storm.test.123", true); + testMetricNamesAndExpected.put("metric.world", true); + + assertTests(sut, testMetricNamesAndExpected); + } + + private void assertTests(FilterByMetricName sut, Map<String, Boolean> testMetricNamesAndExpected) { + for (Map.Entry<String, Boolean> testEntry : testMetricNamesAndExpected.entrySet()) { + assertEquals("actual filter result is not same: " + testEntry.getKey(), + testEntry.getValue(), sut.apply(new IMetricsConsumer.DataPoint(testEntry.getKey(), 1))); + } + } +} \ No newline at end of file
