Repository: storm
Updated Branches:
  refs/heads/master ded5a0df1 -> d42276f6f


STORM-1700 Introduce 'whitelist' / 'blacklist' option to MetricsConsumer

* Users can set whitelist or blacklist to filter out metrics by name
  * if none of them specified (by default), no metrics are filtered out
* how to match: substring match with regular expression
  * use both ^ and $ when you want to match strictly (full string match)
* cache whether the metric name is filtered in or out
  * kinds of metrics are not changing during lifecycle of the topology
  * so applying regex for every metrics could be waste of CPU
* added unit test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/565d53dc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/565d53dc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/565d53dc

Branch: refs/heads/master
Commit: 565d53dc655a4ffa5626b81658379cdf96a13fff
Parents: 4dd6de9
Author: Jungtaek Lim <[email protected]>
Authored: Sat Apr 9 14:11:20 2016 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Mon Jun 13 13:06:26 2016 +0900

----------------------------------------------------------------------
 conf/storm.yaml.example                         |   9 ++
 .../org/apache/storm/daemon/StormCommon.java    |   7 +-
 .../storm/metric/MetricsConsumerBolt.java       |  38 +++++--
 .../storm/metric/filter/FilterByMetricName.java | 110 +++++++++++++++++++
 .../storm/metric/filter/MetricsFilter.java      |  26 +++++
 .../metric/filter/FilterByMetricNameTest.java   |  95 ++++++++++++++++
 6 files changed, 275 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/conf/storm.yaml.example
----------------------------------------------------------------------
diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example
index 0e8b354..c6d9544 100644
--- a/conf/storm.yaml.example
+++ b/conf/storm.yaml.example
@@ -39,12 +39,21 @@
 #     - "server2"
 
 ## Metrics Consumers
+## max.retain.metric.tuples
+## - task queue will be unbounded when max.retain.metric.tuples is equal or 
less than 0.
+## whitelist / blacklist
+## - when none of configuration for metric filter are specified, it'll be 
treated as 'pass all'.
+## - you need to specify either whitelist or blacklist, or none of them. You 
can't specify both of them.
+## - you can specify multiple whitelist / blacklist with regular expression
 # topology.metrics.consumer.register:
 #   - class: "org.apache.storm.metric.LoggingMetricsConsumer"
 #     max.retain.metric.tuples: 100
 #     parallelism.hint: 1
 #   - class: "org.mycompany.MyMetricsConsumer"
 #     max.retain.metric.tuples: 100
+#     whitelist:
+#       - "execute.*"
+#       - "^__complete-latency$"
 #     parallelism.hint: 1
 #     argument:
 #       - endpoint: "metrics-collector.mycompany.org"

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java 
b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
index 0dbb9f2..5097167 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -38,6 +38,7 @@ import org.apache.storm.generated.StreamInfo;
 import org.apache.storm.metric.EventLoggerBolt;
 import org.apache.storm.metric.MetricsConsumerBolt;
 import org.apache.storm.metric.SystemBolt;
+import org.apache.storm.metric.filter.FilterByMetricName;
 import org.apache.storm.security.auth.IAuthorizer;
 import org.apache.storm.task.IBolt;
 import org.apache.storm.testing.NonRichBoltTracker;
@@ -387,8 +388,12 @@ public class StormCommon {
                 Integer phintNum = Utils.getInt(info.get("parallelism.hint"), 
1);
                 Map<String, Object> metricsConsumerConf = new HashMap<String, 
Object>();
                 metricsConsumerConf.put(Config.TOPOLOGY_TASKS, phintNum);
+                List<String> whitelist = (List<String>) info.get("whitelist");
+                List<String> blacklist = (List<String>) info.get("blacklist");
+                FilterByMetricName filterPredicate = new 
FilterByMetricName(whitelist, blacklist);
                 Bolt metricsConsumerBolt = 
Thrift.prepareSerializedBoltDetails(inputs,
-                        new MetricsConsumerBolt(className, argument, 
maxRetainMetricTuples), null, phintNum, metricsConsumerConf);
+                        new MetricsConsumerBolt(className, argument, 
maxRetainMetricTuples, filterPredicate),
+                        null, phintNum, metricsConsumerConf);
 
                 String id = className;
                 if (classOccurrencesMap.containsKey(className)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java 
b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
index 95f9137..16a253c 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java
@@ -17,6 +17,9 @@
  */
 package org.apache.storm.metric;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetricsConsumer;
 import org.apache.storm.task.IBolt;
@@ -27,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -39,15 +43,25 @@ public class MetricsConsumerBolt implements IBolt {
     OutputCollector _collector;
     Object _registrationArgument;
     private final int _maxRetainMetricTuples;
+    private Predicate<IMetricsConsumer.DataPoint> _filterPredicate;
 
-    private final BlockingQueue<MetricsTask> _taskQueue = new 
LinkedBlockingDeque<>();
+    private final BlockingQueue<MetricsTask> _taskQueue;
     private Thread _taskExecuteThread;
     private volatile boolean _running = true;
 
-    public MetricsConsumerBolt(String consumerClassName, Object 
registrationArgument, int maxRetainMetricTuples) {
+    public MetricsConsumerBolt(String consumerClassName, Object 
registrationArgument, int maxRetainMetricTuples,
+                               Predicate<IMetricsConsumer.DataPoint> 
filterPredicate) {
+
         _consumerClassName = consumerClassName;
         _registrationArgument = registrationArgument;
         _maxRetainMetricTuples = maxRetainMetricTuples;
+        _filterPredicate = filterPredicate;
+
+        if (_maxRetainMetricTuples > 0) {
+            _taskQueue = new LinkedBlockingDeque<>(_maxRetainMetricTuples);
+        } else {
+            _taskQueue = new LinkedBlockingDeque<>();
+        }
     }
 
     @Override
@@ -67,17 +81,22 @@ public class MetricsConsumerBolt implements IBolt {
     
     @Override
     public void execute(Tuple input) {
-        // remove older tasks if task queue exceeds the max size
-        if (_taskQueue.size() > _maxRetainMetricTuples) {
-            while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
-                _taskQueue.poll();
-            }
+        IMetricsConsumer.TaskInfo taskInfo = (IMetricsConsumer.TaskInfo) 
input.getValue(0);
+        Collection<IMetricsConsumer.DataPoint> dataPoints = (Collection) 
input.getValue(1);
+        List<IMetricsConsumer.DataPoint> filteredDataPoints = 
getFilteredDataPoints(dataPoints);
+        MetricsTask metricsTask = new MetricsTask(taskInfo, 
filteredDataPoints);
+
+        while (! _taskQueue.offer(metricsTask)) {
+            _taskQueue.poll();
         }
 
-        _taskQueue.add(new 
MetricsTask((IMetricsConsumer.TaskInfo)input.getValue(0), 
(Collection)input.getValue(1)));
         _collector.ack(input);
     }
 
+    private List<IMetricsConsumer.DataPoint> 
getFilteredDataPoints(Collection<IMetricsConsumer.DataPoint> dataPoints) {
+        return Lists.newArrayList(Iterables.filter(dataPoints, 
_filterPredicate));
+    }
+
     @Override
     public void cleanup() {
         _running = false;
@@ -85,7 +104,7 @@ public class MetricsConsumerBolt implements IBolt {
         _taskExecuteThread.interrupt();
     }
 
-    class MetricsTask {
+    static class MetricsTask {
         private IMetricsConsumer.TaskInfo taskInfo;
         private Collection<IMetricsConsumer.DataPoint> dataPoints;
 
@@ -119,4 +138,5 @@ public class MetricsConsumerBolt implements IBolt {
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java 
b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
new file mode 100644
index 0000000..9c95428
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class FilterByMetricName implements MetricsFilter {
+    private final Cache<String, Boolean> filterCache;
+    private final List<Pattern> whitelistPattern;
+    private final List<Pattern> blacklistPattern;
+    private boolean noneSpecified = false;
+
+    public FilterByMetricName(List<String> whitelistPattern, List<String> 
blacklistPattern) {
+        // guard NPE
+        if (whitelistPattern == null) {
+            this.whitelistPattern = Collections.emptyList();
+        } else {
+            this.whitelistPattern = 
convertPatternStringsToPatternInstances(whitelistPattern);
+        }
+
+        // guard NPE
+        if (blacklistPattern == null) {
+            this.blacklistPattern = Collections.emptyList();
+        } else {
+            this.blacklistPattern = 
convertPatternStringsToPatternInstances(blacklistPattern);
+        }
+
+        if (this.whitelistPattern.isEmpty() && 
this.blacklistPattern.isEmpty()) {
+            noneSpecified = true;
+        } else if (!this.whitelistPattern.isEmpty() && 
!this.blacklistPattern.isEmpty()) {
+            throw new IllegalArgumentException("You have to specify either 
includes or excludes, or none.");
+        }
+
+        filterCache = CacheBuilder.newBuilder()
+                .maximumSize(1000)
+                .build();
+    }
+
+    @Override
+    public boolean apply(IMetricsConsumer.DataPoint dataPoint) {
+        if (noneSpecified) {
+            return true;
+        }
+
+        String metricName = dataPoint.name;
+
+        Boolean cachedFilteredIn = filterCache.getIfPresent(metricName);
+        if (cachedFilteredIn != null) {
+            return cachedFilteredIn;
+        } else {
+            boolean filteredIn = isFilteredIn(metricName);
+            filterCache.put(metricName, filteredIn);
+            return filteredIn;
+        }
+    }
+
+    private ArrayList<Pattern> 
convertPatternStringsToPatternInstances(List<String> patterns) {
+        return Lists.newArrayList(Iterators.transform(patterns.iterator(), new 
Function<String, Pattern>() {
+            @Override
+            public Pattern apply(String s) {
+                return Pattern.compile(s);
+            }
+        }));
+    }
+
+    private boolean isFilteredIn(String metricName) {
+        if (!whitelistPattern.isEmpty()) {
+            return checkMatching(metricName, whitelistPattern, true);
+        } else if (!blacklistPattern.isEmpty()) {
+            return checkMatching(metricName, blacklistPattern, false);
+        }
+
+        throw new IllegalStateException("Shouldn't reach here");
+    }
+
+    private boolean checkMatching(String metricName, List<Pattern> patterns, 
boolean valueWhenMatched) {
+        for (Pattern pattern : patterns) {
+            if (pattern.matcher(metricName).find()) {
+                return valueWhenMatched;
+            }
+        }
+
+        return !valueWhenMatched;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java 
b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
new file mode 100644
index 0000000..f12f706
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/filter/MetricsFilter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Predicate;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.io.Serializable;
+
+public interface MetricsFilter extends Predicate<IMetricsConsumer.DataPoint>, 
Serializable {
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/565d53dc/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
 
b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
new file mode 100644
index 0000000..d2f11d6
--- /dev/null
+++ 
b/storm-core/test/jvm/org/apache/storm/metric/filter/FilterByMetricNameTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.*;
+
+public class FilterByMetricNameTest {
+    @Before
+    public void setUp() throws Exception {
+
+    }
+
+    @Test
+    public void testWhitelist() {
+        List<String> whitelistPattern = Lists.newArrayList("^metric\\.", 
"test\\.hello\\.[0-9]+");
+        FilterByMetricName sut = new FilterByMetricName(whitelistPattern, 
null);
+
+        Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+        testMetricNamesAndExpected.put("storm.metric.hello", false);
+        testMetricNamesAndExpected.put("test.hello.world", false);
+        testMetricNamesAndExpected.put("test.hello.123", true);
+        testMetricNamesAndExpected.put("test.metric.world", false);
+        testMetricNamesAndExpected.put("metric.world", true);
+
+        assertTests(sut, testMetricNamesAndExpected);
+    }
+
+    @Test
+    public void testBlacklist() {
+        List<String> blacklistPattern = Lists.newArrayList("^__", "test\\.");
+        FilterByMetricName sut = new FilterByMetricName(null, 
blacklistPattern);
+
+        Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+        testMetricNamesAndExpected.put("__storm.metric.hello", false);
+        testMetricNamesAndExpected.put("storm.metric.__hello", true);
+        testMetricNamesAndExpected.put("test.hello.world", false);
+        testMetricNamesAndExpected.put("storm.test.123", false);
+        testMetricNamesAndExpected.put("metric.world", true);
+
+        assertTests(sut, testMetricNamesAndExpected);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBothWhitelistAndBlacklistAreSpecified() {
+        List<String> whitelistPattern = Lists.newArrayList("^metric\\.", 
"test\\.hello\\.[0-9]+");
+        List<String> blacklistPattern = Lists.newArrayList("^__", "test\\.");
+        new FilterByMetricName(whitelistPattern, blacklistPattern);
+    }
+
+    @Test
+    public void testNoneIsSpecified() {
+        FilterByMetricName sut = new FilterByMetricName(null, null);
+
+        Map<String, Boolean> testMetricNamesAndExpected = Maps.newHashMap();
+        testMetricNamesAndExpected.put("__storm.metric.hello", true);
+        testMetricNamesAndExpected.put("storm.metric.__hello", true);
+        testMetricNamesAndExpected.put("test.hello.world", true);
+        testMetricNamesAndExpected.put("storm.test.123", true);
+        testMetricNamesAndExpected.put("metric.world", true);
+
+        assertTests(sut, testMetricNamesAndExpected);
+    }
+
+    private void assertTests(FilterByMetricName sut, Map<String, Boolean> 
testMetricNamesAndExpected) {
+        for (Map.Entry<String, Boolean> testEntry : 
testMetricNamesAndExpected.entrySet()) {
+            assertEquals("actual filter result is not same: " + 
testEntry.getKey(),
+                    testEntry.getValue(), sut.apply(new 
IMetricsConsumer.DataPoint(testEntry.getKey(), 1)));
+        }
+    }
+}
\ No newline at end of file

Reply via email to