Repository: metron
Updated Branches:
  refs/heads/master ba46fa734 -> 7e0375390


METRON-1060: Add performance timing logging to enrichment topology (mmiklavc) 
closes apache/metron#665


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7e037539
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7e037539
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7e037539

Branch: refs/heads/master
Commit: 7e0375390baba6a3a4c115a71bb7a3d473ccbdf7
Parents: ba46fa7
Author: mmiklavc <[email protected]>
Authored: Mon Jul 31 09:27:27 2017 -0600
Committer: Michael Miklavcic <[email protected]>
Committed: Mon Jul 31 09:27:27 2017 -0600

----------------------------------------------------------------------
 metron-platform/metron-common/README.md         |  54 ++++++
 .../common/performance/PerformanceLogger.java   | 188 +++++++++++++++++++
 .../common/performance/ThresholdCalculator.java |  41 ++++
 .../metron/common/performance/Timing.java       |  65 +++++++
 .../performance/PerformanceLoggerTest.java      | 173 +++++++++++++++++
 .../metron/common/performance/TimingTest.java   |  96 ++++++++++
 .../enrichment/bolt/GenericEnrichmentBolt.java  |  36 ++--
 .../apache/metron/enrichment/bolt/JoinBolt.java |  39 ++--
 .../metron/enrichment/bolt/SplitBolt.java       |  33 ++--
 9 files changed, 681 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md 
b/metron-platform/metron-common/README.md
index 76f9390..fbc3c10 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -7,6 +7,7 @@
 * [Validation Framework](#validation-framework)
 * [Management Utility](#management-utility)
 * [Topology Errors](topology-errors)
+* [Performance Logging](#performance-logging)
 
 # Stellar Language
 
@@ -152,3 +153,56 @@ Error topics for enrichment and threat intel errors are 
passed into the enrichme
 The error topic for indexing errors is passed into the indexing topology as a 
flux property named `index.error.topic`.  This property can be found in either 
`$METRON_HOME/config/elasticsearch.properties` or 
`$METRON_HOME/config/solr.properties` depending on the search engine selected.
 
 By default all error messages are sent to the `indexing` topic so that they 
are indexed and archived, just like other messages.  The indexing config for 
error messages can be found at 
`$METRON_HOME/config/zookeeper/indexing/error.json`.
+
+# Performance Logging
+
+The PerformanceLogger class provides functionality that enables developers to 
debug performance issues. Basic usage looks like the following:
+```
+// create a simple inner performance class to use for logger instantiation
+public static class Perf {}
+// instantiation
+PerformnanceLogger perfLog = new PerformanceLogger(() -> 
getConfigurations().getGlobalConfig(), Perf.class.getName());
+// marking a start time
+perfLog.mark("mark1");
+// ...do some high performance stuff...
+// log the elapsed time
+perfLog.log("mark1", "My high performance stuff is very performant");
+// log no additional message, just the basics
+perfLog.log("mark1");
+```
+
+The logger maintains a Map<String, Long> of named markers that correspond to 
start times. Calling mark() performs a put on the underlying timing store. 
Output includes the mark name, elapsed time in nanoseconds, as well as any 
custom messaging you provide. A sample log would look like the following:
+```
+[DEBUG] 
markName=execute,time(ns)=121411,message=key=7a8dbe44-4cb9-4db2-9d04-7632f543b56c,
 elapsed time to run execute
+```
+
+__Configuration__
+
+The first argument to the logger is a java.util.function.Supplier<Map<String, 
Object>>. The offers flexibility in being able to provide multiple 
configuration "suppliers" depending on your individual usage requirements. The 
example above,
+taken from org.apache.metron.enrichment.bolt.GenericEnrichmentBolt, leverages 
the global config to dymanically provide configuration from Zookeeper. Any 
updates to the global config via Zookeeper are reflected live at runtime. 
Currently,
+the PerformanceLogger supports the following options:
+
+|Property Name                              |Type               |Valid Values  
 |
+|-------------------------------------------|-------------------|---------------|
+|performance.logging.percent.records        |Integer            |0-100         
 |
+
+
+__Other Usage Details__
+
+You can also provide your own format String and provide arguments that will be 
used when formatting that String. This code avoids expensive String 
concatenation by only formatting when debugging is enabled. For more complex 
arguments, e.g. JSON serialization, we expose an isDebugEnabled() method.
+
+```
+// log with format String and single argument
+perfLog.log("join-message", "key={}, elapsed time to join messages", key);
+
+// check if debugging is enabled for the performance logger to avoid more 
expensive operations
+if (perfLog.isDebugEnabled()) {
+    perfLog.log("join-message", "key={}, elapsed time to join messages, 
message={}", key, rawMessage.toJSONString());
+}
+```
+
+__Side Effects__
+
+Calling the mark() method multiple times simply resets the start time to the 
current nano time. Calling log() with a non-existent mark name will log 0 ns 
elapsed time with a warning indicating that log has been invoked for a mark 
name that does not exist.
+The class is not thread-safe and makes no attempt at keeping multiple threads 
from modifying the same markers.
+

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java
new file mode 100644
index 0000000..61015ef
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/PerformanceLogger.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.performance;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.helpers.FormattingTuple;
+import org.slf4j.helpers.MessageFormatter;
+
+public class PerformanceLogger {
+
+  private static final String LOG_PERCENT = 
"performance.logging.percent.records";
+  private static final Integer LOG_PERCENT_DEFAULT = 1;
+  private Supplier<Map<String, Object>> configSupplier;
+  private ThresholdCalculator thresholdCalc;
+  private Timing timing;
+  private Logger logger;
+
+  /**
+   * Minimum constructor for this class.
+   * <p>
+   *   Options:
+   * </p>
+   * <ul>
+   *   <li>performance.logging.percent.records = Integer value 1-100 
indicating percent probability
+   *        that a logging statement should trigger writing timing info.
+   *   </li>
+   * </ul>
+   *
+   * @param configSupplier provides configuration for the logger as a 
Map&lt;String, Object&gt;
+   * @param loggerName     name for the underlying logger. This name is used 
when enabling/disabling
+   * the logger.
+   */
+  public PerformanceLogger(Supplier<Map<String, Object>> configSupplier, 
String loggerName) {
+    this(configSupplier, LoggerFactory.getLogger(loggerName), new 
ThresholdCalculator(),
+        new Timing());
+  }
+
+  public PerformanceLogger(Supplier<Map<String, Object>> configSupplier, 
Logger logger,
+      ThresholdCalculator thresholdCalc, Timing timing) {
+    this.configSupplier = configSupplier;
+    this.thresholdCalc = thresholdCalc;
+    this.timing = timing;
+    this.logger = logger;
+    this.logger.info("{} set to {}", LOG_PERCENT, getPercentThreshold());
+  }
+
+  /**
+   * Marks a timer start. Works in conjunction with the log methods. Calling 
log after
+   * calling mark will log elapsed time for the provided markName.
+   *
+   * @param markName
+   */
+  public void mark(String markName) {
+    timing.mark(markName);
+  }
+
+  /**
+   * Log a message at DEBUG level for the given markName.
+   * Warns when logging for a markName that hasn't been set.
+   * <p/>
+   * <p>This form avoids superfluous string concatenation when the logger
+   * is disabled for the DEBUG level.</p>
+   *
+   * @param markName  name of the marked timer to log elapsed time for
+   */
+  public void log(String markName) {
+    if (okToLog()) {
+      log(markName, "");
+    }
+  }
+
+  /**
+   * Log a message at DEBUG level for the given markName according to the 
specified message.
+   * Warns when logging for a markName that hasn't been set.
+   * <p/>
+   * <p>This form avoids superfluous string concatenation when the logger
+   * is disabled for the DEBUG level.</p>
+   *
+   * @param markName  name of the marked timer to log elapsed time for
+   * @param message   message to log
+   */
+  public void log(String markName, String message) {
+    if (okToLog()) {
+      if (timing.exists(markName)) {
+        logger.debug("markName={},time(ns)={},message={}", markName, 
timing.getElapsed(markName),
+            message);
+      } else {
+        logger.debug("markName={},time(ns)={},message={}", "WARNING - MARK NOT 
SET",
+            timing.getElapsed(markName), message);
+      }
+    }
+  }
+
+  private boolean okToLog() {
+    return logger.isDebugEnabled() && 
thresholdCalc.isPast(getPercentThreshold());
+  }
+
+  private Integer getPercentThreshold() {
+    return ConversionUtils.convert(getProperty(LOG_PERCENT, 
LOG_PERCENT_DEFAULT), Integer.class);
+  }
+
+  private Object getProperty(String key, Object defaultValue) {
+    return configSupplier.get().getOrDefault(key, defaultValue);
+  }
+
+
+  /**
+   * Log a message at DEBUG level for the given markName according to the 
specified format
+   * and argument. Warns when logging for a markName that hasn't been set.
+   * <p/>
+   * <p>This form avoids superfluous string concatenation when the logger
+   * is disabled for the DEBUG level.</p>
+   *
+   * @param markName  name of the marked timer to log elapsed time for
+   * @param format    the format string
+   * @param arg       argument to the format String
+   */
+  public void log(String markName, String format, Object arg) {
+    if (okToLog()) {
+      FormattingTuple formattedMessage = MessageFormatter.format(format, arg);
+      log(markName, formattedMessage.getMessage());
+    }
+  }
+
+  /**
+   * Log a message at DEBUG level according to the specified format and 
argument.
+   * <p/>
+   * <p>This form avoids superfluous string concatenation when the logger
+   * is disabled for the DEBUG level.</p>
+   *
+   * @param markName  name of the marked timer to log elapsed time for
+   * @param format    the format string
+   * @param arg1      first argument to the format String
+   * @param arg2      second argument to the format String
+   */
+  public void log(String markName, String format, Object arg1, Object arg2) {
+    if (okToLog()) {
+      FormattingTuple formattedMessage = MessageFormatter.format(format, arg1, 
arg2);
+      log(markName, formattedMessage.getMessage());
+    }
+  }
+
+  /**
+   * Log a message at DEBUG level according to the specified format and 
arguments.
+   * <p/>
+   * <p>This form avoids superfluous string concatenation when the logger
+   * is disabled for the DEBUG level. However, this variant incurs the hidden
+   * (and relatively small) cost of creating an <code>Object[]</code> before 
invoking the method,
+   * even if this logger is disabled for DEBUG. The variants taking
+   * {@link #log(String, String, Object) one} and {@link #log(String, String, 
Object, Object) two}
+   * arguments exist solely in order to avoid this hidden cost.</p>
+   *
+   * @param markName  name of the marked timer to log elapsed time for
+   * @param format    the format string
+   * @param arguments a list of 3 or more arguments
+   */
+  public void log(String markName, String format, Object... arguments) {
+    if (okToLog()) {
+      FormattingTuple formattedMessage = MessageFormatter.arrayFormat(format, 
arguments);
+      log(markName, formattedMessage.getMessage());
+    }
+  }
+
+  public boolean isDebugEnabled() {
+    return logger.isDebugEnabled();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java
new file mode 100644
index 0000000..de7cfdd
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/ThresholdCalculator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.performance;
+
+public class ThresholdCalculator {
+
+  /**
+   * Returns true roughly the provided percent of the time, false 
100%-'percent' of the time
+   *
+   * @param percent Desired probability to return true
+   * @return true if the percent probability is true for this call.
+   */
+  public boolean isPast(int percent) {
+    double rd = Math.random();
+    if (rd <= toDecimal(percent)) {
+      return true;
+    }
+    return false;
+  }
+
+  private double toDecimal(int percent) {
+    return percent / 100.0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java
new file mode 100644
index 0000000..67d471b
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/performance/Timing.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.performance;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Timing {
+
+  Map<String, Long> startTimes;
+
+  public Timing() {
+    this.startTimes = new HashMap<>();
+  }
+
+  /**
+   * Stores a starting time from current nanoTime with the provided name
+   *
+   * @param name starting time mark name
+   */
+  public void mark(String name) {
+    startTimes.put(name, System.nanoTime());
+  }
+
+  /**
+   * Returns elapsed nanoTime given a stored marked time for the given name. 
Returns 0 for
+   * non-existent mark names
+   *
+   * @param name mark name to get elapsed time for.
+   */
+  public long getElapsed(String name) {
+    if (startTimes.containsKey(name)) {
+      return System.nanoTime() - startTimes.get(name);
+    } else {
+      return 0;
+    }
+  }
+
+  /**
+   * Checks existence of timing marker.
+   *
+   * @param name mark name to lookup.
+   * @return true if mark has been called with this name, false otherwise.
+   */
+  public boolean exists(String name) {
+    return startTimes.containsKey(name);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java
new file mode 100644
index 0000000..88f54e7
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/PerformanceLoggerTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.performance;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+
+public class PerformanceLoggerTest {
+
+  private int thresholdPercent = 10;
+  private Supplier<Map<String, Object>> configSupplier;
+  @Mock
+  private Timing timing;
+  @Mock
+  private ThresholdCalculator thresholdCalc;
+  @Mock
+  private Logger logger;
+  private PerformanceLogger perfLogger;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    configSupplier = () -> 
ImmutableMap.of("performance.logging.percent.records", thresholdPercent);
+    
when(thresholdCalc.isPast(thresholdPercent)).thenReturn(false).thenReturn(false)
+        .thenReturn(true);
+    when(logger.isDebugEnabled()).thenReturn(true);
+    when(timing.exists("t1")).thenReturn(true);
+    perfLogger = new PerformanceLogger(configSupplier, logger, thresholdCalc, 
timing);
+  }
+
+  @Test
+  public void logs_on_threshold() throws Exception {
+    
when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L);
+    perfLogger.mark("t1");
+    perfLogger.log("t1");
+    perfLogger.log("t1");
+    perfLogger.log("t1");
+    verify(timing).mark("t1");
+    verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq(""));
+  }
+
+  @Test
+  public void logs_on_threshold_with_message() throws Exception {
+    
when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L);
+    perfLogger.mark("t1");
+    perfLogger.log("t1", "my message");
+    perfLogger.log("t1", "my message");
+    perfLogger.log("t1", "my message");
+    verify(timing).mark("t1");
+    verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq("my 
message"));
+  }
+
+  @Test
+  public void warns_when_logging_nonexisting_marks() throws Exception {
+    when(thresholdCalc.isPast(thresholdPercent)).thenReturn(true);
+    when(timing.getElapsed("t1")).thenReturn(111L);
+    when(timing.getElapsed("t2")).thenReturn(222L);
+    when(timing.getElapsed("t3")).thenReturn(333L);
+    when(timing.exists("t1")).thenReturn(true);
+    when(timing.exists("t2")).thenReturn(false);
+    when(timing.exists("t3")).thenReturn(false);
+    perfLogger.mark("t1");
+    perfLogger.log("t1", "my message");
+    perfLogger.log("t2", "my message");
+    perfLogger.log("t3", "my message");
+    verify(timing).mark("t1");
+    verify(timing, never()).mark("t2");
+    verify(timing, never()).mark("t3");
+    verify(logger).debug(anyString(), anyObject(), eq(111L), eq("my message"));
+    verify(logger)
+        .debug(anyString(), eq("WARNING - MARK NOT SET"), eq(222L), eq("my 
message"));
+    verify(logger)
+        .debug(anyString(), eq("WARNING - MARK NOT SET"), eq(333L), eq("my 
message"));
+  }
+
+  @Test
+  public void logs_with_multiple_markers() throws Exception {
+    when(thresholdCalc.isPast(thresholdPercent)).thenReturn(true);
+    when(timing.getElapsed("t1")).thenReturn(111L);
+    when(timing.getElapsed("t2")).thenReturn(222L);
+    perfLogger.mark("t1");
+    perfLogger.mark("t2");
+    perfLogger.log("t2", "my message 2");
+    perfLogger.log("t1", "my message 1");
+    verify(timing).mark("t1");
+    verify(timing).mark("t2");
+    verify(logger).debug(anyString(), anyObject(), eq(111L), eq("my message 
1"));
+    verify(logger).debug(anyString(), anyObject(), eq(222L), eq("my message 
2"));
+  }
+
+  @Test
+  public void defaults_to_1_percent_threshold() throws Exception {
+    configSupplier = () -> new HashMap<>();
+    when(thresholdCalc.isPast(1)).thenReturn(false).thenReturn(false)
+        .thenReturn(true);
+    
when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L);
+    perfLogger.mark("t1");
+    perfLogger.log("t1", "my message");
+    perfLogger.log("t1", "my message");
+    perfLogger.log("t1", "my message");
+    verify(timing).mark("t1");
+    verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq("my 
message"));
+  }
+
+  @Test
+  public void does_not_log_when_debugging_disabled() throws Exception {
+    when(logger.isDebugEnabled()).thenReturn(false);
+    
when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L);
+    perfLogger.mark("t1");
+    perfLogger.log("t1", "my message");
+    perfLogger.log("t1", "my message");
+    perfLogger.log("t1", "my message");
+    verify(timing).mark("t1");
+    verify(logger, times(0)).debug(anyString(), anyObject(), eq(111L), eq("my 
message"));
+  }
+
+  @Test
+  public void logs_formatted_message_provided_format_args() throws Exception {
+    when(thresholdCalc.isPast(thresholdPercent)).thenReturn(true);
+    
when(timing.getElapsed("t1")).thenReturn(111L).thenReturn(222L).thenReturn(333L)
+        .thenReturn(444L);
+    perfLogger.mark("t1");
+    perfLogger.log("t1", "my {} message", "1");
+    perfLogger.log("t1", "my {} message {}", "1", "2");
+    perfLogger.log("t1", "my {} message {} {}", "1", "2", "3");
+    perfLogger.log("t1", "my {} message {} {} {}", "1", "2", "3", "4");
+    verify(timing).mark("t1");
+    verify(logger, times(1)).debug(anyString(), anyObject(), eq(111L), eq("my 
1 message"));
+    verify(logger, times(1)).debug(anyString(), anyObject(), eq(222L), eq("my 
1 message 2"));
+    verify(logger, times(1)).debug(anyString(), anyObject(), eq(333L), eq("my 
1 message 2 3"));
+    verify(logger, times(1)).debug(anyString(), anyObject(), eq(444L), eq("my 
1 message 2 3 4"));
+  }
+
+  @Test
+  public void isDebugEnabled_returns_true_when_debugging_is_set() {
+    when(logger.isDebugEnabled()).thenReturn(true);
+    assertThat(perfLogger.isDebugEnabled(), equalTo(true));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java
new file mode 100644
index 0000000..e7ccb79
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/performance/TimingTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.performance;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TimingTest {
+
+  private Timing timing;
+
+  @Before
+  public void setup() {
+    timing = new Timing();
+  }
+
+  @Test
+  public void provides_monotonically_increasing_times_for_single_marker()
+      throws InterruptedException {
+    timing.mark("mark1");
+    long tlast = 0;
+    for (int i = 0; i < 10; i++) {
+      tlast = timing.getElapsed("mark1");
+      Thread.sleep(10);
+      assertThat(timing.getElapsed("mark1") > tlast, equalTo(true));
+    }
+  }
+
+  @Test
+  public void provides_monotonically_increasing_times_for_multiple_markers()
+      throws InterruptedException {
+    timing.mark("mark1");
+    timing.mark("mark2");
+    long tlast1 = 0;
+    long tlast2 = 0;
+    for (int i = 0; i < 10; i++) {
+      tlast1 = timing.getElapsed("mark1");
+      tlast2 = timing.getElapsed("mark2");
+      Thread.sleep(10);
+      assertThat(timing.getElapsed("mark1") > tlast1, equalTo(true));
+      assertThat(timing.getElapsed("mark2") > tlast2, equalTo(true));
+    }
+  }
+
+  @Test
+  public void elapsed_time_on_nonexistent_marker_is_zero() throws 
InterruptedException {
+    timing.mark("mark1");
+    long tlast1 = 0;
+    for (int i = 0; i < 10; i++) {
+      tlast1 = timing.getElapsed("mark1");
+      Thread.sleep(10);
+      assertThat(timing.getElapsed("mark1") > tlast1, equalTo(true));
+      assertThat(timing.getElapsed("mark2"), equalTo(0L));
+    }
+  }
+
+  @Test
+  public void marking_again_resets_timing() throws InterruptedException {
+    timing.mark("mark1");
+    long tlast1 = 0;
+    for (int i = 0; i < 5; i++) {
+      Thread.sleep(10);
+      tlast1 = timing.getElapsed("mark1");
+      timing.mark("mark1");
+      assertThat(timing.getElapsed("mark1") < tlast1, equalTo(true));
+    }
+  }
+
+  @Test
+  public void exists_checks_mark_existence() {
+    timing.mark("mark1");
+    assertThat(timing.exists("mark1"), equalTo(true));
+    assertThat(timing.exists("mark2"), equalTo(false));
+    assertThat(timing.exists("mark3"), equalTo(false));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
index 907b309..db15b46 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBolt.java
@@ -18,34 +18,34 @@
 
 package org.apache.metron.enrichment.bolt;
 
-import org.apache.metron.common.error.MetronError;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.configuration.ConfigurationType;
 import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.performance.PerformanceLogger;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.enrichment.configuration.Enrichment;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Uses an adapter to enrich telemetry messages with additional metadata
  * entries. For a list of available enrichment adapters see
@@ -65,7 +65,8 @@ import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings({"rawtypes", "serial"})
 public class GenericEnrichmentBolt extends ConfiguredEnrichmentBolt {
-
+  public static class Perf {} // used for performance logging
+  private PerformanceLogger perfLog; // not static bc multiple bolts may exist 
in same worker
   private static final Logger LOG = LoggerFactory
           .getLogger(GenericEnrichmentBolt.class);
   public static final String STELLAR_CONTEXT_CONF = "stellarContext";
@@ -158,6 +159,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
       LOG.error("[Metron] GenericEnrichmentBolt could not initialize adapter");
       throw new IllegalStateException("Could not initialize adapter...");
     }
+    perfLog = new PerformanceLogger(() -> 
getConfigurations().getGlobalConfig(), 
GenericEnrichmentBolt.Perf.class.getName());
     initializeStellar();
   }
 
@@ -179,6 +181,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
+    perfLog.mark("execute");
     String key = tuple.getStringByField("key");
     JSONObject rawMessage = (JSONObject) tuple.getValueByField("message");
     String subGroup = "";
@@ -222,7 +225,11 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
               adapter.logAccess(cacheKey);
               prefix = adapter.getOutputPrefix(cacheKey);
               subGroup = adapter.getStreamSubGroup(enrichmentType, field);
+
+              perfLog.mark("enrich");
               enrichedField = cache.getUnchecked(cacheKey);
+              perfLog.log("enrich", "key={}, time to run enrichment type={}", 
key, enrichmentType);
+
               if (enrichedField == null)
                 throw new Exception("[Metron] Could not enrich string: "
                         + value);
@@ -258,6 +265,7 @@ public class GenericEnrichmentBolt extends 
ConfiguredEnrichmentBolt {
     } catch (Exception e) {
       handleError(key, rawMessage, subGroup, enrichedMessage, e);
     }
+    perfLog.log("execute", "key={}, elapsed time to run execute", key);
   }
 
   // Made protected to allow for error testing in integration test. Directly 
flaws inputs while everything is functioning hits other

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
index f3fe52c..f6f3971 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/JoinBolt.java
@@ -25,11 +25,16 @@ import com.google.common.cache.RemovalCause;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Sets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
+import org.apache.metron.common.performance.PerformanceLogger;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -40,15 +45,10 @@ import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 public abstract class JoinBolt<V> extends ConfiguredEnrichmentBolt {
-
-  private static final Logger LOG = LoggerFactory
-          .getLogger(JoinBolt.class);
+  public static class Perf {} // used for performance logging
+  private PerformanceLogger perfLog; // not static bc multiple bolts may exist 
in same worker
+  private static final Logger LOG = LoggerFactory.getLogger(JoinBolt.class);
   protected OutputCollector collector;
 
   protected transient CacheLoader<String, Map<String, Tuple>> loader;
@@ -76,6 +76,7 @@ public abstract class JoinBolt<V> extends 
ConfiguredEnrichmentBolt {
   @Override
   public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
     super.prepare(map, topologyContext, outputCollector);
+    perfLog = new PerformanceLogger(() -> 
getConfigurations().getGlobalConfig(), Perf.class.getName());
     keyGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("key");
     subgroupGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("subgroup");
     messageGetStrategy = MessageGetters.OBJECT_FROM_FIELD.get("message");
@@ -120,6 +121,7 @@ public abstract class JoinBolt<V> extends 
ConfiguredEnrichmentBolt {
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
+    perfLog.mark("execute");
     String streamId = tuple.getSourceStreamId();
     String key = (String) keyGetStrategy.get(tuple);
     String subgroup = (String) subgroupGetStrategy.get(tuple);
@@ -128,8 +130,7 @@ public abstract class JoinBolt<V> extends 
ConfiguredEnrichmentBolt {
     try {
       Map<String, Tuple> streamMessageMap = cache.get(key);
       if (streamMessageMap.containsKey(streamId)) {
-        LOG.warn(String.format("Received key %s twice for " +
-                "stream %s", key, streamId));
+        LOG.warn(String.format("Received key %s twice for stream %s", key, 
streamId));
       }
       streamMessageMap.put(streamId, tuple);
       Set<String> streamIds = getStreamIds(message);
@@ -138,12 +139,17 @@ public abstract class JoinBolt<V> extends 
ConfiguredEnrichmentBolt {
         && Sets.symmetricDifference(streamMessageKeys, streamIds)
                .isEmpty()
          ) {
-        collector.emit( "message"
-                      , tuple
-                      , new Values( key
-                                  , joinMessages(streamMessageMap, 
this.messageGetStrategy)
-                                  )
-                      );
+
+        perfLog.mark("join-message");
+        V joinedMessages = joinMessages(streamMessageMap, 
this.messageGetStrategy);
+        perfLog.log("join-message", "key={}, elapsed time to join messages", 
key);
+
+        perfLog.mark("emit-message");
+        collector.emit("message",
+                       tuple,
+                       new Values(key, joinedMessages));
+        perfLog.log("emit-message", "key={}, elapsed time to emit messages", 
key);
+
         cache.invalidate(key);
         Tuple messageTuple = streamMessageMap.get("message:");
         collector.ack(messageTuple);
@@ -166,6 +172,7 @@ public abstract class JoinBolt<V> extends 
ConfiguredEnrichmentBolt {
       ErrorUtils.handleError(collector, error);
       collector.ack(tuple);
     }
+    perfLog.log("execute", "key={}, elapsed time to run execute", key);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/7e037539/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
index 953e6a6..de69ad4 100644
--- 
a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
+++ 
b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/bolt/SplitBolt.java
@@ -17,21 +17,21 @@
  */
 package org.apache.metron.enrichment.bolt;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.performance.PerformanceLogger;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.metron.common.bolt.ConfiguredBolt;
-import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
-public abstract class SplitBolt<T extends Cloneable> extends
-        ConfiguredEnrichmentBolt {
+public abstract class SplitBolt<T extends Cloneable> extends 
ConfiguredEnrichmentBolt {
+  public static class Perf {} // used for performance logging
+  private PerformanceLogger perfLog; // not static bc multiple bolts may exist 
in same worker
 
   protected OutputCollector collector;
 
@@ -43,6 +43,7 @@ public abstract class SplitBolt<T extends Cloneable> extends
   public final void prepare(Map map, TopologyContext topologyContext,
                        OutputCollector outputCollector) {
     super.prepare(map, topologyContext, outputCollector);
+    perfLog = new PerformanceLogger(() -> 
getConfigurations().getGlobalConfig(), Perf.class.getName());
     collector = outputCollector;
     prepare(map, topologyContext);
   }
@@ -63,31 +64,35 @@ public abstract class SplitBolt<T extends Cloneable> extends
   }
 
   public void emit(Tuple tuple, T message) {
+    perfLog.mark("emit");
     if (message == null) return;
     String key = getKey(tuple, message);
+
+    perfLog.mark("split-message");
     Map<String, List<T>> streamMessageMap = splitMessage(message);
+    perfLog.log("split-message", "key={}, elapsed time to split message", key);
+
     for (String streamId : streamMessageMap.keySet()) {
       List<T> streamMessages = streamMessageMap.get(streamId);
-      if(streamMessages != null) {
-        for(T streamMessage : streamMessages) {
+      if (streamMessages != null) {
+        for (T streamMessage : streamMessages) {
           if (streamMessage == null) {
             streamMessage = getDefaultMessage(streamId);
           }
           collector.emit(streamId, new Values(key, streamMessage));
         }
-      }
-      else {
+      } else {
         throw new IllegalArgumentException("Enrichment must send some list of 
messages, not null.");
       }
     }
     collector.emit("message", tuple, new Values(key, message, ""));
     collector.ack(tuple);
     emitOther(tuple, message);
+    perfLog.log("emit", "key={}, elapsed time to run emit", key);
   }
 
   protected T getDefaultMessage(String streamId) {
-    throw new IllegalArgumentException("Could not find a message for" +
-            " stream: " + streamId);
+    throw new IllegalArgumentException("Could not find a message for stream: " 
+ streamId);
   }
 
   public abstract void prepare(Map map, TopologyContext topologyContext);

Reply via email to