Repository: metron Updated Branches: refs/heads/master ba46fa734 -> 7e0375390
METRON-1060: Add performance timing logging to enrichment topology (mmiklavc) closes apache/metron#665 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7e037539 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7e037539 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7e037539 Branch: refs/heads/master Commit: 7e0375390baba6a3a4c115a71bb7a3d473ccbdf7 Parents: ba46fa7 Author: mmiklavc <[email protected]> Authored: Mon Jul 31 09:27:27 2017 -0600 Committer: Michael Miklavcic <[email protected]> Committed: Mon Jul 31 09:27:27 2017 -0600 ---------------------------------------------------------------------- metron-platform/metron-common/README.md | 54 ++++++ .../common/performance/PerformanceLogger.java | 188 +++++++++++++++++++ .../common/performance/ThresholdCalculator.java | 41 ++++ .../metron/common/performance/Timing.java | 65 +++++++ .../performance/PerformanceLoggerTest.java | 173 +++++++++++++++++ .../metron/common/performance/TimingTest.java | 96 ++++++++++ .../enrichment/bolt/GenericEnrichmentBolt.java | 36 ++-- .../apache/metron/enrichment/bolt/JoinBolt.java | 39 ++-- .../metron/enrichment/bolt/SplitBolt.java | 33 ++-- 9 files changed, 681 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md index 76f9390..fbc3c10 100644 --- a/metron-platform/metron-common/README.md +++ b/metron-platform/metron-common/README.md @@ -7,6 +7,7 @@ * [Validation Framework](#validation-framework) * [Management Utility](#management-utility) * [Topology Errors](topology-errors) +* [Performance Logging](#performance-logging) # Stellar Language @@ -152,3 +153,56 @@ Error topics for enrichment and threat intel errors are passed into the enrichme The error topic for indexing errors is passed into the indexing topology as a flux property named `index.error.topic`. This property can be found in either `$METRON_HOME/config/elasticsearch.properties` or `$METRON_HOME/config/solr.properties` depending on the search engine selected. By default all error messages are sent to the `indexing` topic so that they are indexed and archived, just like other messages. The indexing config for error messages can be found at `$METRON_HOME/config/zookeeper/indexing/error.json`. + +# Performance Logging + +The PerformanceLogger class provides functionality that enables developers to debug performance issues. Basic usage looks like the following: +``` +// create a simple inner performance class to use for logger instantiation +public static class Perf {} +// instantiation +PerformnanceLogger perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName()); +// marking a start time +perfLog.mark("mark1"); +// ...do some high performance stuff... +// log the elapsed time +perfLog.log("mark1", "My high performance stuff is very performant"); +// log no additional message, just the basics +perfLog.log("mark1"); +``` + +The logger maintains a Map<String, Long> of named markers that correspond to start times. Calling mark() performs a put on the underlying timing store. Output includes the mark name, elapsed time in nanoseconds, as well as any custom messaging you provide. A sample log would look like the following: +``` +[DEBUG] markName=execute,time(ns)=121411,message=key=7a8dbe44-4cb9-4db2-9d04-7632f543b56c, elapsed time to run execute +``` + +__Configuration__ + +The first argument to the logger is a java.util.function.Supplier<Map<String, Object>>. The offers flexibility in being able to provide multiple configuration "suppliers" depending on your individual usage requirements. The example above, +taken from org.apache.metron.enrichment.bolt.GenericEnrichmentBolt, leverages the global config to dymanically provide configuration from Zookeeper. Any updates to the global config via Zookeeper are reflected live at runtime. Currently, +the PerformanceLogger supports the following options: + +|Property Name |Type |Valid Values | +|-------------------------------------------|-------------------|---------------| +|performance.logging.percent.records |Integer |0-100 | + + +__Other Usage Details__ + +You can also provide your own format String and provide arguments that will be used when formatting that String. This code avoids expensive String concatenation by only formatting when debugging is enabled. For more complex arguments, e.g. JSON serialization, we expose an isDebugEnabled() method. + +``` +// log with format String and single argument +perfLog.log("join-message", "key={}, elapsed time to join messages", key); + +// check if debugging is enabled for the performance logger to avoid more expensive operations +if (perfLog.isDebugEnabled()) { + perfLog.log("join-message", "key={}, elapsed time to join messages, message={}", key, rawMessage.toJSONString()); +} +``` + +__Side Effects__ + +Calling the mark() method multiple times simply resets the start time to the current nano time. Calling log() with a non-existent mark name will log 0 ns elapsed time with a warning indicating that log has been invoked for a mark name that does not exist. +The class is not thread-safe and makes no attempt at keeping multiple threads from modifying the same markers. + http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java new file mode 100644 index 0000000..61015ef --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.performance; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.helpers.FormattingTuple; +import org.slf4j.helpers.MessageFormatter; + +public class PerformanceLogger { + + private static final String LOG_PERCENT = "performance.logging.percent.records"; + private static final Integer LOG_PERCENT_DEFAULT = 1; + private Supplier<Map<String, Object>> configSupplier; + private ThresholdCalculator thresholdCalc; + private Timing timing; + private Logger logger; + + /** + * Minimum constructor for this class. + * <p> + * Options: + * </p> + * <ul> + * <li>performance.logging.percent.records = Integer value 1-100 indicating percent probability + * that a logging statement should trigger writing timing info. + * </li> + * </ul> + * + * @param configSupplier provides configuration for the logger as a Map<String, Object> + * @param loggerName name for the underlying logger. This name is used when enabling/disabling + * the logger. + */ + public PerformanceLogger(Supplier<Map<String, Object>> configSupplier, String loggerName) { + this(configSupplier, LoggerFactory.getLogger(loggerName), new ThresholdCalculator(), + new Timing()); + } + + public PerformanceLogger(Supplier<Map<String, Object>> configSupplier, Logger logger, + ThresholdCalculator thresholdCalc, Timing timing) { + this.configSupplier = configSupplier; + this.thresholdCalc = thresholdCalc; + this.timing = timing; + this.logger = logger; + this.logger.info("{} set to {}", LOG_PERCENT, getPercentThreshold()); + } + + /** + * Marks a timer start. Works in conjunction with the log methods. Calling log after + * calling mark will log elapsed time for the provided markName. + * + * @param markName + */ + public void mark(String markName) { + timing.mark(markName); + } + + /** + * Log a message at DEBUG level for the given markName. + * Warns when logging for a markName that hasn't been set. + * <p/> + * <p>This form avoids superfluous string concatenation when the logger + * is disabled for the DEBUG level.</p> + * + * @param markName name of the marked timer to log elapsed time for + */ + public void log(String markName) { + if (okToLog()) { + log(markName, ""); + } + } + + /** + * Log a message at DEBUG level for the given markName according to the specified message. + * Warns when logging for a markName that hasn't been set. + * <p/> + * <p>This form avoids superfluous string concatenation when the logger + * is disabled for the DEBUG level.</p> + * + * @param markName name of the marked timer to log elapsed time for + * @param message message to log + */ + public void log(String markName, String message) { + if (okToLog()) { + if (timing.exists(markName)) { + logger.debug("markName={},time(ns)={},message={}", markName, timing.getElapsed(markName), + message); + } else { + logger.debug("markName={},time(ns)={},message={}", "WARNING - MARK NOT SET", + timing.getElapsed(markName), message); + } + } + } + + private boolean okToLog() { + return logger.isDebugEnabled() && thresholdCalc.isPast(getPercentThreshold()); + } + + private Integer getPercentThreshold() { + return ConversionUtils.convert(getProperty(LOG_PERCENT, LOG_PERCENT_DEFAULT), Integer.class); + } + + private Object getProperty(String key, Object defaultValue) { + return configSupplier.get().getOrDefault(key, defaultValue); + } + + + /** + * Log a message at DEBUG level for the given markName according to the specified format + * and argument. Warns when logging for a markName that hasn't been set. + * <p/> + * <p>This form avoids superfluous string concatenation when the logger + * is disabled for the DEBUG level.</p> + * + * @param markName name of the marked timer to log elapsed time for + * @param format the format string + * @param arg argument to the format String + */ + public void log(String markName, String format, Object arg) { + if (okToLog()) { + FormattingTuple formattedMessage = MessageFormatter.format(format, arg); + log(markName, formattedMessage.getMessage()); + } + } + + /** + * Log a message at DEBUG level according to the specified format and argument. + * <p/> + * <p>This form avoids superfluous string concatenation when the logger + * is disabled for the DEBUG level.</p> + * + * @param markName name of the marked timer to log elapsed time for + * @param format the format string + * @param arg1 first argument to the format String + * @param arg2 second argument to the format String + */ + public void log(String markName, String format, Object arg1, Object arg2) { + if (okToLog()) { + FormattingTuple formattedMessage = MessageFormatter.format(format, arg1, arg2); + log(markName, formattedMessage.getMessage()); + } + } + + /** + * Log a message at DEBUG level according to the specified format and arguments. + * <p/> + * <p>This form avoids superfluous string concatenation when the logger + * is disabled for the DEBUG level. However, this variant incurs the hidden + * (and relatively small) cost of creating an <code>Object[]</code> before invoking the method, + * even if this logger is disabled for DEBUG. The variants taking + * {@link #log(String, String, Object) one} and {@link #log(String, String, Object, Object) two} + * arguments exist solely in order to avoid this hidden cost.</p> + * + * @param markName name of the marked timer to log elapsed time for + * @param format the format string + * @param arguments a list of 3 or more arguments + */ + public void log(String markName, String format, Object... arguments) { + if (okToLog()) { + FormattingTuple formattedMessage = MessageFormatter.arrayFormat(format, arguments); + log(markName, formattedMessage.getMessage()); + } + } + + public boolean isDebugEnabled() { + return logger.isDebugEnabled(); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java new file mode 100644 index 0000000..de7cfdd --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.performance; + +public class ThresholdCalculator { + + /** + * Returns true roughly the provided percent of the time, false 100%-'percent' of the time + * + * @param percent Desired probability to return true + * @return true if the percent probability is true for this call. + */ + public boolean isPast(int percent) { + double rd = Math.random(); + if (rd <= toDecimal(percent)) { + return true; + } + return false; + } + + private double toDecimal(int percent) { + return percent / 100.0; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java new file mode 100644 index 0000000..67d471b --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.performance; + +import java.util.HashMap; +import java.util.Map; + +public class Timing { + + Map<String, Long> startTimes; + + public Timing() { + this.startTimes = new HashMap<>(); + } + + /** + * Stores a starting time from current nanoTime with the provided name + * + * @param name starting time mark name + */ + public void mark(String name) { + startTimes.put(name, System.nanoTime()); + } + + /** + * Returns elapsed nanoTime given a stored marked time for the given name. Returns 0 for + * non-existent mark names + * + * @param name mark name to get elapsed time for. + */ + public long getElapsed(String name) { + if (startTimes.containsKey(name)) { + return System.nanoTime() - startTimes.get(name); + } else { + return 0; + } + } + + /** + * Checks existence of timing marker. + * + * @param name mark name to lookup. + * @return true if mark has been called with this name, false otherwise. + */ + public boolean exists(String name) { + return startTimes.containsKey(name); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java new file mode 100644 index 0000000..88f54e7 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.performance; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.slf4j.Logger; + +public class PerformanceLoggerTest { + + private int thresholdPercent = 10; + private Supplier<Map<String, Object>> configSupplier; + @Mock + private Timing timing; + @Mock + private ThresholdCalculator thresholdCalc; + @Mock + private Logger logger; + private PerformanceLogger perfLogger; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + configSupplier = () -> ImmutableMap.of("performance.logging.percent.records", thresholdPercent); + when(thresholdCalc.isPast(thresholdPercent)).thenReturn(false).thenReturn(false) + .thenReturn(true); + when(logger.isDebugEnabled()).thenReturn(true); + when(timing.exists("t1")).thenReturn(true); + perfLogger = new PerformanceLogger(configSupplier, logger, thresholdCalc, timing); + } + + @Test + public void logs_on_threshold() throws Exception { + when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L); + perfLogger.mark("t1"); + perfLogger.log("t1"); + perfLogger.log("t1"); + perfLogger.log("t1"); + verify(timing).mark("t1"); + verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq("")); + } + + @Test + public void logs_on_threshold_with_message() throws Exception { + when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L); + perfLogger.mark("t1"); + perfLogger.log("t1", "my message"); + perfLogger.log("t1", "my message"); + perfLogger.log("t1", "my message"); + verify(timing).mark("t1"); + verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq("my message")); + } + + @Test + public void warns_when_logging_nonexisting_marks() throws Exception { + when(thresholdCalc.isPast(thresholdPercent)).thenReturn(true); + when(timing.getElapsed("t1")).thenReturn(111L); + when(timing.getElapsed("t2")).thenReturn(222L); + when(timing.getElapsed("t3")).thenReturn(333L); + when(timing.exists("t1")).thenReturn(true); + when(timing.exists("t2")).thenReturn(false); + when(timing.exists("t3")).thenReturn(false); + perfLogger.mark("t1"); + perfLogger.log("t1", "my message"); + perfLogger.log("t2", "my message"); + perfLogger.log("t3", "my message"); + verify(timing).mark("t1"); + verify(timing, never()).mark("t2"); + verify(timing, never()).mark("t3"); + verify(logger).debug(anyString(), anyObject(), eq(111L), eq("my message")); + verify(logger) + .debug(anyString(), eq("WARNING - MARK NOT SET"), eq(222L), eq("my message")); + verify(logger) + .debug(anyString(), eq("WARNING - MARK NOT SET"), eq(333L), eq("my message")); + } + + @Test + public void logs_with_multiple_markers() throws Exception { + when(thresholdCalc.isPast(thresholdPercent)).thenReturn(true); + when(timing.getElapsed("t1")).thenReturn(111L); + when(timing.getElapsed("t2")).thenReturn(222L); + perfLogger.mark("t1"); + perfLogger.mark("t2"); + perfLogger.log("t2", "my message 2"); + perfLogger.log("t1", "my message 1"); + verify(timing).mark("t1"); + verify(timing).mark("t2"); + verify(logger).debug(anyString(), anyObject(), eq(111L), eq("my message 1")); + verify(logger).debug(anyString(), anyObject(), eq(222L), eq("my message 2")); + } + + @Test + public void defaults_to_1_percent_threshold() throws Exception { + configSupplier = () -> new HashMap<>(); + when(thresholdCalc.isPast(1)).thenReturn(false).thenReturn(false) + .thenReturn(true); + when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L); + perfLogger.mark("t1"); + perfLogger.log("t1", "my message"); + perfLogger.log("t1", "my message"); + perfLogger.log("t1", "my message"); + verify(timing).mark("t1"); + verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq("my message")); + } + + @Test + public void does_not_log_when_debugging_disabled() throws Exception { + when(logger.isDebugEnabled()).thenReturn(false); + when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L); + perfLogger.mark("t1"); + perfLogger.log("t1", "my message"); + perfLogger.log("t1", "my message"); + perfLogger.log("t1", "my message"); + verify(timing).mark("t1"); + verify(logger, times(0)).debug(anyString(), anyObject(), eq(111L), eq("my message")); + } + + @Test + public void logs_formatted_message_provided_format_args() throws Exception { + when(thresholdCalc.isPast(thresholdPercent)).thenReturn(true); + when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L) + .thenReturn(444L); + perfLogger.mark("t1"); + perfLogger.log("t1", "my {} message", "1"); + perfLogger.log("t1", "my {} message {}", "1", "2"); + perfLogger.log("t1", "my {} message {} {}", "1", "2", "3"); + perfLogger.log("t1", "my {} message {} {} {}", "1", "2", "3", "4"); + verify(timing).mark("t1"); + verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq("my 1 message")); + verify(logger, times(1)).debug(anyString(), anyObject(), eq(222L), eq("my 1 message 2")); + verify(logger, times(1)).debug(anyString(), anyObject(), eq(333L), eq("my 1 message 2 3")); + verify(logger, times(1)).debug(anyString(), anyObject(), eq(444L), eq("my 1 message 2 3 4")); + } + + @Test + public void isDebugEnabled_returns_true_when_debugging_is_set() { + when(logger.isDebugEnabled()).thenReturn(true); + assertThat(perfLogger.isDebugEnabled(), equalTo(true)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java new file mode 100644 index 0000000..e7ccb79 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.performance; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.junit.Before; +import org.junit.Test; + +public class TimingTest { + + private Timing timing; + + @Before + public void setup() { + timing = new Timing(); + } + + @Test + public void provides_monotonically_increasing_times_for_single_marker() + throws InterruptedException { + timing.mark("mark1"); + long tlast = 0; + for (int i = 0; i < 10; i++) { + tlast = timing.getElapsed("mark1"); + Thread.sleep(10); + assertThat(timing.getElapsed("mark1") > tlast, equalTo(true)); + } + } + + @Test + public void provides_monotonically_increasing_times_for_multiple_markers() + throws InterruptedException { + timing.mark("mark1"); + timing.mark("mark2"); + long tlast1 = 0; + long tlast2 = 0; + for (int i = 0; i < 10; i++) { + tlast1 = timing.getElapsed("mark1"); + tlast2 = timing.getElapsed("mark2"); + Thread.sleep(10); + assertThat(timing.getElapsed("mark1") > tlast1, equalTo(true)); + assertThat(timing.getElapsed("mark2") > tlast2, equalTo(true)); + } + } + + @Test + public void elapsed_time_on_nonexistent_marker_is_zero() throws InterruptedException { + timing.mark("mark1"); + long tlast1 = 0; + for (int i = 0; i < 10; i++) { + tlast1 = timing.getElapsed("mark1"); + Thread.sleep(10); + assertThat(timing.getElapsed("mark1") > tlast1, equalTo(true)); + assertThat(timing.getElapsed("mark2"), equalTo(0L)); + } + } + + @Test + public void marking_again_resets_timing() throws InterruptedException { + timing.mark("mark1"); + long tlast1 = 0; + for (int i = 0; i < 5; i++) { + Thread.sleep(10); + tlast1 = timing.getElapsed("mark1"); + timing.mark("mark1"); + assertThat(timing.getElapsed("mark1") < tlast1, equalTo(true)); + } + } + + @Test + public void exists_checks_mark_existence() { + timing.mark("mark1"); + assertThat(timing.exists("mark1"), equalTo(true)); + assertThat(timing.exists("mark2"), equalTo(false)); + assertThat(timing.exists("mark3"), equalTo(false)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java index 907b309..db15b46 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java @@ -18,34 +18,34 @@ package org.apache.metron.enrichment.bolt; -import org.apache.metron.common.error.MetronError; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; -import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.performance.PerformanceLogger; import org.apache.metron.common.utils.ErrorUtils; import org.apache.metron.enrichment.configuration.Enrichment; import org.apache.metron.enrichment.interfaces.EnrichmentAdapter; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** * Uses an adapter to enrich telemetry messages with additional metadata * entries. For a list of available enrichment adapters see @@ -65,7 +65,8 @@ import java.util.concurrent.TimeUnit; @SuppressWarnings({"rawtypes", "serial"}) public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { - + public static class Perf {} // used for performance logging + private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker private static final Logger LOG = LoggerFactory .getLogger(GenericEnrichmentBolt.class); public static final String STELLAR_CONTEXT_CONF = "stellarContext"; @@ -158,6 +159,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { LOG.error("[Metron] GenericEnrichmentBolt could not initialize adapter"); throw new IllegalStateException("Could not initialize adapter..."); } + perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), GenericEnrichmentBolt.Perf.class.getName()); initializeStellar(); } @@ -179,6 +181,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { + perfLog.mark("execute"); String key = tuple.getStringByField("key"); JSONObject rawMessage = (JSONObject) tuple.getValueByField("message"); String subGroup = ""; @@ -222,7 +225,11 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { adapter.logAccess(cacheKey); prefix = adapter.getOutputPrefix(cacheKey); subGroup = adapter.getStreamSubGroup(enrichmentType, field); + + perfLog.mark("enrich"); enrichedField = cache.getUnchecked(cacheKey); + perfLog.log("enrich", "key={}, time to run enrichment type={}", key, enrichmentType); + if (enrichedField == null) throw new Exception("[Metron] Could not enrich string: " + value); @@ -258,6 +265,7 @@ public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt { } catch (Exception e) { handleError(key, rawMessage, subGroup, enrichedMessage, e); } + perfLog.log("execute", "key={}, elapsed time to run execute", key); } // Made protected to allow for error testing in integration test. Directly flaws inputs while everything is functioning hits other http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java index f3fe52c..f6f3971 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java @@ -25,11 +25,16 @@ import com.google.common.cache.RemovalCause; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.collect.Sets; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt; import org.apache.metron.common.error.MetronError; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.message.MessageGetters; +import org.apache.metron.common.performance.PerformanceLogger; import org.apache.metron.common.utils.ErrorUtils; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -40,15 +45,10 @@ import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { - - private static final Logger LOG = LoggerFactory - .getLogger(JoinBolt.class); + public static class Perf {} // used for performance logging + private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker + private static final Logger LOG = LoggerFactory.getLogger(JoinBolt.class); protected OutputCollector collector; protected transient CacheLoader<String, Map<String, Tuple>> loader; @@ -76,6 +76,7 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { super.prepare(map, topologyContext, outputCollector); + perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName()); keyGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("key"); subgroupGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("subgroup"); messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("message"); @@ -120,6 +121,7 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { + perfLog.mark("execute"); String streamId = tuple.getSourceStreamId(); String key = (String) keyGetStrategy.get(tuple); String subgroup = (String) subgroupGetStrategy.get(tuple); @@ -128,8 +130,7 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { try { Map<String, Tuple> streamMessageMap = cache.get(key); if (streamMessageMap.containsKey(streamId)) { - LOG.warn(String.format("Received key %s twice for " + - "stream %s", key, streamId)); + LOG.warn(String.format("Received key %s twice for stream %s", key, streamId)); } streamMessageMap.put(streamId, tuple); Set<String> streamIds = getStreamIds(message); @@ -138,12 +139,17 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { && Sets.symmetricDifference(streamMessageKeys, streamIds) .isEmpty() ) { - collector.emit( "message" - , tuple - , new Values( key - , joinMessages(streamMessageMap, this.messageGetStrategy) - ) - ); + + perfLog.mark("join-message"); + V joinedMessages = joinMessages(streamMessageMap, this.messageGetStrategy); + perfLog.log("join-message", "key={}, elapsed time to join messages", key); + + perfLog.mark("emit-message"); + collector.emit("message", + tuple, + new Values(key, joinedMessages)); + perfLog.log("emit-message", "key={}, elapsed time to emit messages", key); + cache.invalidate(key); Tuple messageTuple = streamMessageMap.get("message:"); collector.ack(messageTuple); @@ -166,6 +172,7 @@ public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt { ErrorUtils.handleError(collector, error); collector.ack(tuple); } + perfLog.log("execute", "key={}, elapsed time to run execute", key); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java index 953e6a6..de69ad4 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java @@ -17,21 +17,21 @@ */ package org.apache.metron.enrichment.bolt; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt; +import org.apache.metron.common.performance.PerformanceLogger; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; -import org.apache.metron.common.bolt.ConfiguredBolt; -import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt; - -import java.util.List; -import java.util.Map; -import java.util.Set; -public abstract class SplitBolt<T extends Cloneable> extends - ConfiguredEnrichmentBolt { +public abstract class SplitBolt<T extends Cloneable> extends ConfiguredEnrichmentBolt { + public static class Perf {} // used for performance logging + private PerformanceLogger perfLog; // not static bc multiple bolts may exist in same worker protected OutputCollector collector; @@ -43,6 +43,7 @@ public abstract class SplitBolt<T extends Cloneable> extends public final void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { super.prepare(map, topologyContext, outputCollector); + perfLog = new PerformanceLogger(() -> getConfigurations().getGlobalConfig(), Perf.class.getName()); collector = outputCollector; prepare(map, topologyContext); } @@ -63,31 +64,35 @@ public abstract class SplitBolt<T extends Cloneable> extends } public void emit(Tuple tuple, T message) { + perfLog.mark("emit"); if (message == null) return; String key = getKey(tuple, message); + + perfLog.mark("split-message"); Map<String, List<T>> streamMessageMap = splitMessage(message); + perfLog.log("split-message", "key={}, elapsed time to split message", key); + for (String streamId : streamMessageMap.keySet()) { List<T> streamMessages = streamMessageMap.get(streamId); - if(streamMessages != null) { - for(T streamMessage : streamMessages) { + if (streamMessages != null) { + for (T streamMessage : streamMessages) { if (streamMessage == null) { streamMessage = getDefaultMessage(streamId); } collector.emit(streamId, new Values(key, streamMessage)); } - } - else { + } else { throw new IllegalArgumentException("Enrichment must send some list of messages, not null."); } } collector.emit("message", tuple, new Values(key, message, "")); collector.ack(tuple); emitOther(tuple, message); + perfLog.log("emit", "key={}, elapsed time to run emit", key); } protected T getDefaultMessage(String streamId) { - throw new IllegalArgumentException("Could not find a message for" + - " stream: " + streamId); + throw new IllegalArgumentException("Could not find a message for stream: " + streamId); } public abstract void prepare(Map map, TopologyContext topologyContext);
