METRON-1748 Improve Storm Profiler Integration Test (nickwallen) closes 
apache/metron#1174


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/f5f765ca
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/f5f765ca
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/f5f765ca

Branch: refs/heads/master
Commit: f5f765cafcd985a323b5774ff86fe6d43f6186d2
Parents: 6231074
Author: nickwallen <[email protected]>
Authored: Fri Sep 7 08:52:24 2018 -0400
Committer: nickallen <[email protected]>
Committed: Fri Sep 7 08:52:24 2018 -0400

----------------------------------------------------------------------
 .../profiler/DefaultMessageDistributor.java     |  18 +-
 .../src/test/resources/log4j.properties         |   3 +
 .../profiler/bolt/ProfileBuilderBolt.java       |  55 ++-
 .../zookeeper/event-time-test/profiler.json     |  19 +-
 .../integration/ProfilerIntegrationTest.java    | 365 +++++++++++--------
 .../src/test/resources/log4j.properties         |  10 +-
 .../src/test/resources/telemetry.json           | 100 +++++
 7 files changed, 383 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index 82f7174..dee6bd8 100644
--- 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -175,10 +175,10 @@ public class DefaultMessageDistributor implements 
MessageDistributor {
    */
   @Override
   public List<ProfileMeasurement> flush() {
+    LOG.debug("About to flush active profiles");
 
     // cache maintenance needed here to ensure active profiles will expire
-    activeCache.cleanUp();
-    expiredCache.cleanUp();
+    cacheMaintenance();
 
     List<ProfileMeasurement> measurements = flushCache(activeCache);
     return measurements;
@@ -200,10 +200,10 @@ public class DefaultMessageDistributor implements 
MessageDistributor {
    */
   @Override
   public List<ProfileMeasurement> flushExpired() {
+    LOG.debug("About to flush expired profiles");
 
     // cache maintenance needed here to ensure active profiles will expire
-    activeCache.cleanUp();
-    expiredCache.cleanUp();
+    cacheMaintenance();
 
     // flush all expired profiles
     List<ProfileMeasurement> measurements = flushCache(expiredCache);
@@ -215,6 +215,16 @@ public class DefaultMessageDistributor implements 
MessageDistributor {
   }
 
   /**
+   * Performs cache maintenance on both the active and expired caches.
+   */
+  private void cacheMaintenance() {
+    activeCache.cleanUp();
+    expiredCache.cleanUp();
+
+    LOG.debug("Cache maintenance complete: activeCacheSize={}, 
expiredCacheSize={}", activeCache.size(), expiredCache.size());
+  }
+
+  /**
    * Flush all of the profiles maintained in a cache.
    *
    * @param cache The cache to flush.

http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties 
b/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties
index 70be8ae..82b0022 100644
--- 
a/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties
+++ 
b/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties
@@ -26,3 +26,6 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
+
+# uncomment below to help debug tests
+#log4j.logger.org.apache.metron.profiler=DEBUG

http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 0d1f27e..6c22b45 100644
--- 
a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ 
b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -20,7 +20,6 @@
 
 package org.apache.metron.profiler.bolt;
 
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -56,10 +55,12 @@ import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 import static 
org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD;
@@ -73,6 +74,12 @@ import static 
org.apache.metron.profiler.bolt.ProfileSplitterBolt.TIMESTAMP_TUPL
  * <p>This bolt maintains the state required to build a Profile.  When the 
window
  * period expires, the data is summarized as a {@link ProfileMeasurement}, all 
state is
  * flushed, and the {@link ProfileMeasurement} is emitted.
+ *
+ * <p>There are two mechanisms that will cause a profile to flush. As new 
messages arrive,
+ * time is advanced. The splitter bolt attaches a timestamp to each message 
(which can be
+ * either event or system time.)  This advances time and leads to profile 
measurements
+ * being flushed. Alternatively, if no messages arrive to advance time, then 
the "time-to-live"
+ * mechanism will flush a profile after no messages have been received for 
some period of time.
  */
 public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable 
{
 
@@ -283,16 +290,47 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
             .build();
   }
 
+  /**
+   * Logs information about the {@link TupleWindow}.
+   *
+   * @param window The tuple window.
+   */
+  private void log(TupleWindow window) {
+    // summarize the newly received tuples
+    LongSummaryStatistics received = window.get()
+            .stream()
+            .map(tuple -> getField(TIMESTAMP_TUPLE_FIELD, tuple, Long.class))
+            .collect(Collectors.summarizingLong(Long::longValue));
+
+    LOG.debug("Tuple(s) received; count={}, min={}, max={}, range={} ms",
+            received.getCount(),
+            received.getMin(),
+            received.getMax(),
+            received.getMax() - received.getMin());
+
+    if (window.getExpired().size() > 0) {
+      // summarize the expired tuples
+      LongSummaryStatistics expired = window.getExpired()
+              .stream()
+              .map(tuple -> getField(TIMESTAMP_TUPLE_FIELD, tuple, Long.class))
+              .collect(Collectors.summarizingLong(Long::longValue));
+
+      LOG.debug("Tuple(s) expired; count={}, min={}, max={}, range={} ms, 
lag={} ms",
+              expired.getCount(),
+              expired.getMin(),
+              expired.getMax(),
+              expired.getMax() - expired.getMin(),
+              received.getMin() - expired.getMin());
+    }
+  }
+
   @Override
   public void execute(TupleWindow window) {
-
-    LOG.debug("Tuple window contains {} tuple(s), {} expired, {} new",
-            CollectionUtils.size(window.get()),
-            CollectionUtils.size(window.getExpired()),
-            CollectionUtils.size(window.getNew()));
+    if(LOG.isDebugEnabled()) {
+      log(window);
+    }
 
     try {
-
       // handle each tuple in the window
       for(Tuple tuple : window.get()) {
         handleMessage(tuple);
@@ -304,7 +342,6 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
       }
 
     } catch (Throwable e) {
-
       LOG.error("Unexpected error", e);
       collector.reportError(e);
     }
@@ -361,7 +398,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt 
implements Reloadable {
 
     // keep track of time
     activeFlushSignal.update(timestamp);
-    
+
     // distribute the message
     MessageRoute route = new MessageRoute(definition, entity);
     synchronized (messageDistributor) {

http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
 
b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
index 9d727a3..534b7c6 100644
--- 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
+++ 
b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
@@ -1,12 +1,19 @@
 {
+  "timestampField": "timestamp",
   "profiles": [
     {
-      "profile": "event-time-test",
+      "profile": "count-by-ip",
       "foreach": "ip_src_addr",
-      "init":   { "counter": "0" },
-      "update": { "counter": "counter + 1" },
-      "result": "counter"
+      "init": { "count": 0 },
+      "update": { "count" : "count + 1" },
+      "result": "count"
+    },
+    {
+      "profile": "total-count",
+      "foreach": "'total'",
+      "init": { "count": 0 },
+      "update": { "count": "count + 1" },
+      "result": "count"
     }
-  ],
-  "timestampField": "timestamp"
+  ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 268ce26..322ba13 100644
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -20,17 +20,9 @@
 
 package org.apache.metron.profiler.integration;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.commons.io.FileUtils;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.utils.SerDeUtils;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.integration.BaseIntegrationTest;
@@ -39,42 +31,44 @@ import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.stellar.FixedLookback;
+import org.apache.metron.profiler.client.stellar.GetProfile;
+import org.apache.metron.profiler.client.stellar.WindowLookback;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.storm.Config;
-import org.apache.storm.serialization.KryoTupleDeserializer;
-import org.apache.storm.serialization.KryoTupleSerializer;
-import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
+import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.code.tempusfugit.temporal.Duration.seconds;
 import static com.google.code.tempusfugit.temporal.Timeout.timeout;
 import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
-import static org.junit.Assert.assertArrayEquals;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -82,6 +76,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final String TEST_RESOURCES = 
"../../metron-analytics/metron-profiler/src/test";
   private static final String FLUX_PATH = 
"../metron-profiler/src/main/flux/profiler/remote.yaml";
 
@@ -94,13 +89,13 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
   private static final String outputTopic = "profiles";
   private static final int saltDivisor = 10;
 
-  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
-  private static final long windowDurationMillis = 
TimeUnit.SECONDS.toMillis(5);
-  private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(10);
-  private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(15);
+  private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(20);
+  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10);
+  private static final long windowDurationMillis = 
TimeUnit.SECONDS.toMillis(10);
+  private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(20);
+
   private static final long maxRoutesPerBolt = 100000;
 
-  private static ColumnBuilder columnBuilder;
   private static ZKServerComponent zkComponent;
   private static FluxTopologyComponent fluxComponent;
   private static KafkaComponent kafkaComponent;
@@ -112,6 +107,8 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
   private static String message2;
   private static String message3;
 
+  private StellarStatefulExecutor executor;
+
   /**
    * [
    *    org.apache.metron.profiler.ProfileMeasurement,
@@ -122,6 +119,7 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
    *    org.apache.metron.common.configuration.profiler.ProfilerConfig,
    *    org.apache.metron.common.configuration.profiler.ProfileConfig,
    *    org.json.simple.JSONObject,
+   *    org.json.simple.JSONArray,
    *    java.util.LinkedHashMap,
    *    org.apache.metron.statistics.OnlineStatisticsProvider
    *  ]
@@ -129,78 +127,132 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
   @Multiline
   private static String kryoSerializers;
 
-  /**
-   * The Profiler can generate profiles based on processing time.  With 
processing time,
-   * the Profiler builds profiles based on when the telemetry was processed.
-   *
-   * <p>Not defining a 'timestampField' within the Profiler configuration 
tells the Profiler
-   * to use processing time.
-   */
   @Test
   public void testProcessingTime() throws Exception {
+    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/processing-time-test");
 
-    // upload the config to zookeeper
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
-
-    // start the topology and write test messages to kafka
+    // start the topology and write 3 test messages to kafka
     fluxComponent.submitTopology();
-
-    // the messages that will be applied to the profile
     kafkaComponent.writeMessages(inputTopic, message1);
     kafkaComponent.writeMessages(inputTopic, message2);
     kafkaComponent.writeMessages(inputTopic, message3);
 
-    // storm needs at least one message to close its event window
+    // retrieve the profile measurement using PROFILE_GET
+    String profileGetExpression = "PROFILE_GET('processing-time-test', 
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
+    List<Integer> measurements = execute(profileGetExpression, List.class);
+
+    // need to keep checking for measurements until the profiler has flushed 
one out
     int attempt = 0;
-    while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
+    while(measurements.size() == 0 && attempt++ < 10) {
 
-      // sleep, at least beyond the current window
-      Thread.sleep(windowDurationMillis + windowLagMillis);
+      // wait for the profiler to flush
+      long sleep = windowDurationMillis;
+      LOG.debug("Waiting {} millis for profiler to flush", sleep);
+      Thread.sleep(sleep);
 
-      // send another message to help close the current event window
+      // write another message to advance time. this ensures we are testing 
the 'normal' flush mechanism.
+      // if we do not send additional messages to advance time, then it is the 
profile TTL mechanism which
+      // will ultimately flush the profile
       kafkaComponent.writeMessages(inputTopic, message2);
+
+      // try again to retrieve the profile measurement using PROFILE_GET
+      measurements = execute(profileGetExpression, List.class);
     }
 
-    // validate what was flushed
-    List<Integer> actuals = read(
-            profilerTable.getPutLog(),
-            columnFamily,
-            columnBuilder.getColumnQualifier("value"),
-            Integer.class);
-    assertEquals(1, actuals.size());
-    assertTrue(actuals.get(0) >= 3);
+    // expect to see only 1 measurement, but could be more (one for each 
period) depending on
+    // how long we waited for the flush to occur
+    assertTrue(measurements.size() > 0);
+
+    // the profile should have counted at least 3 messages; the 3 test 
messages that were sent.
+    // the count could be higher due to the test messages we sent to advance 
time.
+    assertTrue(measurements.get(0) >= 3);
   }
 
-  /**
-   * The Profiler can generate profiles using event time.  With event time 
processing,
-   * the Profiler uses timestamps contained in the source telemetry.
-   *
-   * <p>Defining a 'timestampField' within the Profiler configuration tells 
the Profiler
-   * from which field the timestamp should be extracted.
-   */
   @Test
-  public void testEventTime() throws Exception {
-
-    // upload the profiler config to zookeeper
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
+  public void testProcessingTimeWithTimeToLiveFlush() throws Exception {
+    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/processing-time-test");
 
-    // start the topology and write test messages to kafka
+    // start the topology and write 3 test messages to kafka
     fluxComponent.submitTopology();
     kafkaComponent.writeMessages(inputTopic, message1);
     kafkaComponent.writeMessages(inputTopic, message2);
     kafkaComponent.writeMessages(inputTopic, message3);
 
-    // wait until the profile is flushed
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
+    // wait a bit beyond the window lag before writing another message.  this 
allows storm's window manager to close
+    // the event window, which then lets the profiler processes the previous 
messages.
+    long sleep = windowLagMillis + periodDurationMillis;
+    LOG.debug("Waiting {} millis before sending message to close window", 
sleep);
+    Thread.sleep(sleep);
+    kafkaComponent.writeMessages(inputTopic, message3);
+
+    // retrieve the profile measurement using PROFILE_GET
+    String profileGetExpression = "PROFILE_GET('processing-time-test', 
'10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
+    List<Integer> measurements = execute(profileGetExpression, List.class);
 
-    List<Put> puts = profilerTable.getPutLog();
-    assertEquals(1, puts.size());
+    // need to keep checking for measurements until the profiler has flushed 
one out
+    int attempt = 0;
+    while(measurements.size() == 0 && attempt++ < 10) {
+
+      // wait for the profiler to flush
+      sleep = windowDurationMillis;
+      LOG.debug("Waiting {} millis for profiler to flush", sleep);
+      Thread.sleep(sleep);
+
+      // do not write additional messages to advance time. this ensures that 
we are testing the "time to live"
+      // flush mechanism. the TTL setting defines when the profile will be 
flushed
+
+      // try again to retrieve the profile measurement
+      measurements = execute(profileGetExpression, List.class);
+    }
+
+    // expect to see only 1 measurement, but could be more (one for each 
period) depending on
+    // how long we waited for the flush to occur
+    assertTrue(measurements.size() > 0);
 
-    // inspect the row key to ensure the profiler used event time correctly.  
the timestamp
-    // embedded in the row key should match those in the source telemetry
-    byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, 
startAt);
-    byte[] actualRowKey = puts.get(0).getRow();
-    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
+    // the profile should have counted 3 messages; the 3 test messages that 
were sent
+    assertEquals(3, measurements.get(0).intValue());
+  }
+
+  @Test
+  public void testEventTime() throws Exception {
+    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/event-time-test");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    List<String> messages = FileUtils.readLines(new 
File("src/test/resources/telemetry.json"));
+    kafkaComponent.writeMessages(inputTopic, messages);
+
+    long timestamp = System.currentTimeMillis();
+    LOG.debug("Attempting to close window period by sending message with 
timestamp = {}", timestamp);
+    kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", 
timestamp));
+    kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", 
timestamp));
+
+    // create the 'window' that looks up to 5 hours before the max timestamp 
contained in the test data
+    assign("maxTimestamp", "1530978728982L");
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+
+    // wait until the profile flushes both periods.  the first period will 
flush immediately as subsequent messages
+    // advance time.  the next period contains all of the remaining messages, 
so there are no other messages to
+    // advance time.  because of this the next period only flushes after the 
time-to-live expires
+    waitOrTimeout(() -> profilerTable.getPutLog().size() >= 6, 
timeout(seconds(90)));
+    {
+      // there are 14 messages in the first period and 12 in the next where 
ip_src_addr = 192.168.66.1
+      List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', 
window)", List.class);
+      assertEquals(14, results.get(0));
+      assertEquals(12, results.get(1));
+    }
+    {
+      // there are 36 messages in the first period and 38 in the next where 
ip_src_addr = 192.168.138.158
+      List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', 
window)", List.class);
+      assertEquals(36, results.get(0));
+      assertEquals(38, results.get(1));
+    }
+    {
+      // in all there are 50 messages in the first period and 50 messages in 
the next
+      List results = execute("PROFILE_GET('total-count', 'total', window)", 
List.class);
+      assertEquals(50, results.get(0));
+      assertEquals(50, results.get(1));
+    }
   }
 
   /**
@@ -212,34 +264,25 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
    */
   @Test
   public void testProfileWithStatsObject() throws Exception {
-
-    // upload the profiler config to zookeeper
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats");
+    uploadConfigToZookeeper(TEST_RESOURCES + 
"/config/zookeeper/profile-with-stats");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1);
-    kafkaComponent.writeMessages(inputTopic, message2);
-    kafkaComponent.writeMessages(inputTopic, message3);
+    List<String> messages = FileUtils.readLines(new 
File("src/test/resources/telemetry.json"));
+    kafkaComponent.writeMessages(inputTopic, messages);
 
     // wait until the profile is flushed
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
 
-    // ensure that a value was persisted in HBase
-    List<Put> puts = profilerTable.getPutLog();
-    assertEquals(1, puts.size());
-
-    // generate the expected row key. only the profile name, entity, and 
period are used to generate the row key
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile-with-stats")
-            .withEntity("global")
-            .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS);
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
-    byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement);
-
-    // ensure the correct row key was generated
-    byte[] actualRowKey = puts.get(0).getRow();
-    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
+    // validate the measurements written by the batch profiler using 
`PROFILE_GET`
+    // the 'window' looks up to 5 hours before the max timestamp contained in 
the test data
+    assign("maxTimestamp", "1530978728982L");
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+
+    // retrieve the stats stored by the profiler
+    List results = execute("PROFILE_GET('profile-with-stats', 'global', 
window)", List.class);
+    assertTrue(results.size() > 0);
+    assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
   }
 
   /**
@@ -256,73 +299,21 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
               new String(actual, "UTF-8"));
   }
 
-  /**
-   * Generates the expected row key.
-   *
-   * @param profileName The name of the profile.
-   * @param entity The entity.
-   * @param whenMillis A timestamp in epoch milliseconds.
-   * @return A row key.
-   */
-  private byte[] generateExpectedRowKey(String profileName, String entity, 
long whenMillis) {
-
-    // only the profile name, entity, and period are used to generate the row 
key
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName(profileName)
-            .withEntity(entity)
-            .withPeriod(whenMillis, periodDurationMillis, 
TimeUnit.MILLISECONDS);
-
-    // build the row key
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
-    return rowKeyBuilder.rowKey(measurement);
-  }
-
-  /**
-   * Reads a value written by the Profiler.
-   *
-   * @param family The column family.
-   * @param qualifier The column qualifier.
-   * @param clazz The expected type of the value.
-   * @param <T> The expected type of the value.
-   * @return The value written by the Profiler.
-   */
-  private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, 
Class<T> clazz) {
-    List<T> results = new ArrayList<>();
-
-    for(Put put: puts) {
-      List<Cell> cells = put.get(Bytes.toBytes(family), qualifier);
-      for(Cell cell : cells) {
-        T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
-        results.add(value);
-      }
-    }
-
-    return results;
+  private static String getMessage(String ipSource, long timestamp) {
+    return new MessageBuilder()
+            .withField("ip_src_addr", ipSource)
+            .withField("timestamp", timestamp)
+            .build()
+            .toJSONString();
   }
 
   @BeforeClass
   public static void setupBeforeClass() throws UnableToStartException {
 
     // create some messages that contain a timestamp - a really old timestamp; 
close to 1970
-    message1 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt)
-            .build()
-            .toJSONString();
-
-    message2 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt + 100)
-            .build()
-            .toJSONString();
-
-    message3 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt + (windowDurationMillis * 2))
-            .build()
-            .toJSONString();
-
-    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+    message1 = getMessage(entity, startAt);
+    message2 = getMessage(entity, startAt + 100);
+    message3 = getMessage(entity, startAt + (windowDurationMillis * 2));
 
     // storm topology properties
     final Properties topologyProperties = new Properties() {{
@@ -330,6 +321,7 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
       // storm settings
       setProperty("profiler.workers", "1");
       setProperty("profiler.executors", "0");
+
       setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]");
       setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60");
       setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000");
@@ -343,7 +335,7 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
       // kafka settings
       setProperty("profiler.input.topic", inputTopic);
       setProperty("profiler.output.topic", outputTopic);
-      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+      setProperty("kafka.start", "EARLIEST");
       setProperty("kafka.security.protocol", "PLAINTEXT");
 
       // hbase settings
@@ -412,6 +404,30 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
   public void setup() {
     // create the mock table
     profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE.getKey(), tableName);
+      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), 
MockHBaseTableProvider.class.getName());
+
+      // client needs to use the same period duration
+      put(PROFILER_PERIOD.getKey(), Long.toString(periodDurationMillis));
+      put(PROFILER_PERIOD_UNITS.getKey(), "MILLISECONDS");
+
+      // client needs to use the same salt divisor
+      put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor);
+    }};
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(GetProfile.class)
+                    .withClass(FixedLookback.class)
+                    .withClass(WindowLookback.class),
+            new Context.Builder()
+                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+                    .build());
   }
 
   @After
@@ -428,10 +444,35 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
    * @param path The path on the local filesystem to the config values.
    * @throws Exception
    */
-  public void uploadConfig(String path) throws Exception {
+  public void uploadConfigToZookeeper(String path) throws Exception {
     configUploadComponent
             .withGlobalConfiguration(path)
             .withProfilerConfiguration(path)
             .update();
   }
+
+  /**
+   * Assign a value to the result of an expression.
+   *
+   * @param var The variable to assign.
+   * @param expression The expression to execute.
+   */
+  private void assign(String var, String expression) {
+    executor.assign(var, expression, Collections.emptyMap());
+  }
+
+  /**
+   * Execute a Stellar expression.
+   *
+   * @param expression The Stellar expression to execute.
+   * @param clazz
+   * @param <T>
+   * @return The result of executing the Stellar expression.
+   */
+  private <T> T execute(String expression, Class<T> clazz) {
+    T results = executor.execute(expression, Collections.emptyMap(), clazz);
+
+    LOG.debug("{} = {}", expression, results);
+    return results;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/resources/log4j.properties 
b/metron-analytics/metron-profiler/src/test/resources/log4j.properties
index 541f368..1c2359a 100644
--- a/metron-analytics/metron-profiler/src/test/resources/log4j.properties
+++ b/metron-analytics/metron-profiler/src/test/resources/log4j.properties
@@ -26,9 +26,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.Target=System.out
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
-log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter
-log4j.appender.stdout.filter.1.StringToMatch=Connection timed out
-log4j.appender.stdout.filter.1.AcceptOnMatch=false
-log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter
-log4j.appender.stdout.filter.2.StringToMatch=Background
-log4j.appender.stdout.filter.2.AcceptOnMatch=false
\ No newline at end of file
+
+# uncomment below to help debug tests
+#log4j.logger.org.apache.metron.profiler=ALL
+#log4j.logger.org.apache.storm.windowing=ALL

Reply via email to