http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE 
b/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE
new file mode 100755
index 0000000..1c05f2d
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,92 @@
+
+metron-profiler
+Copyright 2006-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project contains annotations derived from JCIP-ANNOTATIONS
+Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
+
+===============================================================================
+
+The BracketFinder (package org.apache.commons.math3.optimization.univariate)
+and PowellOptimizer (package org.apache.commons.math3.optimization.general)
+classes are based on the Python code in module "optimize.py" (version 0.5)
+developed by Travis E. Oliphant for the SciPy library (http://www.scipy.org/)
+Copyright © 2003-2009 SciPy Developers.
+===============================================================================
+
+The LinearConstraint, LinearObjectiveFunction, LinearOptimizer,
+RelationShip, SimplexSolver and SimplexTableau classes in package
+org.apache.commons.math3.optimization.linear include software developed by
+Benjamin McCann (http://www.benmccann.com) and distributed with
+the following copyright: Copyright 2009 Google Inc.
+===============================================================================
+
+This product includes software developed by the
+University of Chicago, as Operator of Argonne National
+Laboratory.
+The LevenbergMarquardtOptimizer class in package
+org.apache.commons.math3.optimization.general includes software
+translated from the lmder, lmpar and qrsolv Fortran routines
+from the Minpack package
+Minpack Copyright Notice (1999) University of Chicago.  All rights reserved
+===============================================================================
+
+The GraggBulirschStoerIntegrator class in package
+org.apache.commons.math3.ode.nonstiff includes software translated
+from the odex Fortran routine developed by E. Hairer and G. Wanner.
+Original source copyright:
+Copyright (c) 2004, Ernst Hairer
+===============================================================================
+
+The EigenDecompositionImpl class in package
+org.apache.commons.math3.linear includes software translated
+from some LAPACK Fortran routines.  Original source copyright:
+Copyright (c) 1992-2008 The University of Tennessee.  All rights reserved.
+===============================================================================
+
+The MersenneTwister class in package org.apache.commons.math3.random
+includes software translated from the 2002-01-26 version of
+the Mersenne-Twister generator written in C by Makoto Matsumoto and Takuji
+Nishimura. Original source copyright:
+Copyright (C) 1997 - 2002, Makoto Matsumoto and Takuji Nishimura,
+All rights reserved
+===============================================================================
+
+The LocalizedFormatsTest class in the unit tests is an adapted version of
+the OrekitMessagesTest class from the orekit library distributed under the
+terms of the Apache 2 licence. Original source copyright:
+Copyright 2010 CS Systèmes d'Information
+===============================================================================
+
+The HermiteInterpolator class and its corresponding test have been imported 
from
+the orekit library distributed under the terms of the Apache 2 licence. 
Original
+source copyright:
+Copyright 2010-2012 CS Systèmes d'Information
+===============================================================================
+
+The creation of the package "o.a.c.m.analysis.integration.gauss" was inspired
+by an original code donated by Sébastien Brisard.
+===============================================================================
+
+The complete text of licenses and disclaimers associated with the the original
+sources enumerated above at the time of code translation are in the LICENSE.txt
+file.
+
+                            The Netty Project
+                            =================
+
+Please visit the Netty web site for more information:
+
+  * http://netty.io/
+
+Copyright 2011 The Netty Project
+
+Google Guice - Core Library
+Copyright 2006-2011 Google, Inc.
+
+Google Guice - Extensions - Servlet
+Copyright 2006-2011 Google, Inc.
+

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh
 
b/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh
new file mode 100644
index 0000000..6ec78f5
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+METRON_VERSION=${project.version}
+METRON_HOME=/usr/metron/$METRON_VERSION
+TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote 
$METRON_HOME/flux/profiler/remote.yaml --filter 
$METRON_HOME/config/profiler.properties

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json
 
b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json
new file mode 100644
index 0000000..9d727a3
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json
@@ -0,0 +1,12 @@
+{
+  "profiles": [
+    {
+      "profile": "event-time-test",
+      "foreach": "ip_src_addr",
+      "init":   { "counter": "0" },
+      "update": { "counter": "counter + 1" },
+      "result": "counter"
+    }
+  ],
+  "timestampField": "timestamp"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json
 
b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json
new file mode 100644
index 0000000..e75ec0f
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json
@@ -0,0 +1,11 @@
+{
+  "profiles": [
+    {
+      "profile": "processing-time-test",
+      "foreach": "ip_src_addr",
+      "init":   { "counter": "0" },
+      "update": { "counter": "counter + 1" },
+      "result": "counter"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json
 
b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json
new file mode 100644
index 0000000..083e73f
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json
@@ -0,0 +1,12 @@
+{
+  "profiles": [
+    {
+      "profile": "profile-with-stats",
+      "foreach": "'global'",
+      "init":   { "stats": "STATS_INIT()" },
+      "update": { "stats": "STATS_ADD(stats, 1)" },
+      "result": "stats"
+    }
+  ],
+  "timestampField": "timestamp"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
new file mode 100644
index 0000000..8b8813b
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@code FixedFrequencyFlushSignal} class.
+ */
+public class FixedFrequencyFlushSignalTest {
+
+  @Test
+  public void testSignalFlush() {
+
+    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+
+    // not time to flush yet
+    assertFalse(signal.isTimeToFlush());
+
+    // advance time
+    signal.update(5000);
+
+    // not time to flush yet
+    assertFalse(signal.isTimeToFlush());
+
+    // advance time
+    signal.update(7000);
+
+    // time to flush
+    assertTrue(signal.isTimeToFlush());
+  }
+
+  @Test
+  public void testOutOfOrderTimestamps() {
+    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+
+    // advance time, out-of-order
+    signal.update(5000);
+    signal.update(1000);
+    signal.update(7000);
+    signal.update(3000);
+
+    // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even 
out-of-order), then it should signal a flush
+    assertTrue(signal.isTimeToFlush());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNegativeFrequency() {
+    new FixedFrequencyFlushSignal(-1000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java
new file mode 100644
index 0000000..2f9eca4
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java
@@ -0,0 +1,118 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests the HBaseEmitter class.
+ */
+public class HBaseEmitterTest {
+
+  /**
+   * {
+   *   "profile": "profile-one",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String profileDefinition;
+
+  private HBaseEmitter emitter;
+  private ProfileConfig profile;
+  private OutputCollector collector;
+
+  @Before
+  public void setup() throws Exception {
+    emitter = new HBaseEmitter();
+    profile = createDefinition(profileDefinition);
+    collector = Mockito.mock(OutputCollector.class);
+  }
+
+  /**
+   * The handler should emit a message containing the result of executing
+   * the 'result/profile' expression.
+   */
+  @Test
+  public void testEmit() throws Exception {
+
+    // create a measurement that has triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withDefinition(profile)
+            .withProfileValue(22);
+
+    // execute the test
+    emitter.emit(measurement, collector);
+
+    // the measurement should be emitted as-is
+    ProfileMeasurement actual = expectMeasurement(emitter, collector);
+    assertEquals(measurement, actual);
+  }
+
+  /**
+   * Verifies that the emitter does emit a {@code ProfileMeasurement}.
+   *
+   * @return The {@code ProfileMeasurement} that was emitted
+   */
+  private ProfileMeasurement expectMeasurement(HBaseEmitter hbaseEmitter, 
OutputCollector collector) {
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(hbaseEmitter.getStreamId()), 
arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof ProfileMeasurement);
+    return (ProfileMeasurement) values.get(0);
+  }
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfileConfig createDefinition(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
new file mode 100644
index 0000000..51ca3a4
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java
@@ -0,0 +1,291 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests the KafkaDestinationHandler.
+ */
+public class KafkaEmitterTest {
+
+  /**
+   * {
+   *   "profile": "profile-one-destination",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": {
+   *      "profile": "x",
+   *      "triage": {
+   *        "value": "x"
+   *       }
+   *    }
+   * }
+   */
+  @Multiline
+  private String profileDefinitionWithTriage;
+
+  private KafkaEmitter kafkaEmitter;
+  private ProfileConfig profile;
+  private OutputCollector collector;
+
+  @Before
+  public void setup() throws Exception {
+    kafkaEmitter = new KafkaEmitter();
+    profile = createDefinition(profileDefinitionWithTriage);
+    collector = Mockito.mock(OutputCollector.class);
+  }
+
+  /**
+   * The handler should emit a message when a result/triage expression(s) has 
been defined.
+   */
+  @Test
+  public void testEmit() throws Exception {
+
+    // create a measurement that has triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withDefinition(profile)
+            .withTriageValues(Collections.singletonMap("triage-key", 
"triage-value"));
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+
+    // a message should be emitted
+    verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), any());
+  }
+
+  /**
+   * The handler should NOT emit a message when there is NO result/triage 
value(s).
+   */
+  @Test
+  public void testDoNotEmit() throws Exception {
+
+    // create a measurement with NO triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withDefinition(profile);
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+
+    // a message should NOT be emitted
+    verify(collector, times(0)).emit(eq(kafkaEmitter.getStreamId()), any());
+  }
+
+  /**
+   * Validate that the message generated for Kafka should include the triage 
value.
+   */
+  @Test
+  public void testTriageValueInMessage() throws Exception {
+
+    // create a measurement that has triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", 
"triage-value"));
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+    // validate the core parts of the message
+    assertEquals(measurement.getProfileName(),                    
actual.get("profile"));
+    assertEquals(measurement.getEntity(),                         
actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(),             
actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(),    
actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(),      
actual.get("period.end"));
+    assertEquals("profiler",                                      
actual.get("source.type"));
+    assertNotNull(actual.get("timestamp"));
+
+    // validate that the triage value has been added
+    assertEquals(measurement.getTriageValues().get("triage-key"), 
actual.get("triage-key"));
+  }
+
+  /**
+   * Validate that the message generated for Kafka can include multiple triage 
values.
+   */
+  @Test
+  public void testMultipleTriageValueInMessage() throws Exception {
+
+    // multiple triage values have been defined
+    Map<String, Object> triageValues = ImmutableMap.of(
+            "x", 2,
+            "y", "4",
+            "z", 6.0);
+
+    // create a measurement that has multiple triage values
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(triageValues);
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+    // validate that ALL of the triage values have been added
+    assertEquals(measurement.getTriageValues().get("x"), actual.get("x"));
+    assertEquals(measurement.getTriageValues().get("y"), actual.get("y"));
+    assertEquals(measurement.getTriageValues().get("z"), actual.get("z"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits 
the types of values
+   * that can result from a triage expression.  Only primitive types and 
Strings are allowed.
+   */
+  @Test
+  public void testInvalidType() throws Exception {
+
+    // create one invalid expression and one valid expression
+    Map<String, Object> triageValues = ImmutableMap.of(
+            "invalid", new OnlineStatisticsProvider(),
+            "valid", 4);
+
+    // create the measurement with a Map as a triage value; this is not allowed
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(triageValues);
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+    // validate the core parts of the message still exist
+    assertEquals(measurement.getProfileName(),                    
actual.get("profile"));
+    assertEquals(measurement.getEntity(),                         
actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(),             
actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(),    
actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(),      
actual.get("period.end"));
+    assertEquals("profiler",                                      
actual.get("source.type"));
+
+    // the invalid expression should be skipped and not included in the message
+    assertFalse(actual.containsKey("invalid"));
+
+    // but the valid expression should still be there
+    assertEquals(triageValues.get("valid"), actual.get("valid"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits 
the types of values
+   * that can result from a triage expression.  Only primitive types and 
Strings are allowed.
+   */
+  @Test
+  public void testIntegerIsValidType() throws Exception {
+
+    // create a measurement with a triage value that is an integer
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", 123));
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), 
actual.get("triage-key"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits 
the types of values
+   * that can result from a triage expression.  Only primitive types and 
Strings are allowed.
+   */
+  @Test
+  public void testStringIsValidType() throws Exception {
+
+    // create a measurement with a triage value that is a string
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withDefinition(profile)
+            .withProfileName(profile.getProfile())
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "value"));
+
+    // execute the test
+    kafkaEmitter.emit(measurement, collector);
+    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), 
actual.get("triage-key"));
+  }
+
+  /**
+   * Verifies that the KafkaEmitter does emit a JSONObject.
+   * @return The JSONObject that was emitted
+   */
+  private JSONObject expectJsonObject(KafkaEmitter kafkaEmitter, 
OutputCollector collector) {
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), 
arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    return (JSONObject) values.get(0);
+  }
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfileConfig createDefinition(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
new file mode 100644
index 0000000..fc94afa
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java
@@ -0,0 +1,356 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm;
+
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.profiler.MessageDistributor;
+import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.storm.integration.MessageBuilder;
+import org.apache.metron.test.bolt.BaseBoltTest;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the ProfileBuilderBolt.
+ */
+public class ProfileBuilderBoltTest extends BaseBoltTest {
+
+  private JSONObject message1;
+  private JSONObject message2;
+  private ProfileConfig profile1;
+  private ProfileConfig profile2;
+  private ProfileMeasurementEmitter emitter;
+  private ManualFlushSignal flushSignal;
+  private ProfileMeasurement measurement;
+
+  @Before
+  public void setup() throws Exception {
+
+    message1 = new MessageBuilder()
+            .withField("ip_src_addr", "10.0.0.1")
+            .withField("value", "22")
+            .build();
+
+    message2 = new MessageBuilder()
+            .withField("ip_src_addr", "10.0.0.2")
+            .withField("value", "22")
+            .build();
+
+    profile1 = new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withInit("x", "0")
+            .withUpdate("x", "x + 1")
+            .withResult("x");
+
+    profile2 = new ProfileConfig()
+            .withProfile("profile2")
+            .withForeach("ip_src_addr")
+            .withInit(Collections.singletonMap("x", "0"))
+            .withUpdate(Collections.singletonMap("x", "x + 1"))
+            .withResult("x");
+
+    measurement = new ProfileMeasurement()
+            .withEntity("entity1")
+            .withProfileName("profile1")
+            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
+            .withProfileValue(22);
+
+    flushSignal = new ManualFlushSignal();
+    flushSignal.setFlushNow(false);
+  }
+
+  /**
+   * The bolt should extract a message and timestamp from a tuple and
+   * pass that to a {@code MessageDistributor}.
+   */
+  @Test
+  public void testExtractMessage() throws Exception {
+
+    ProfileBuilderBolt bolt = createBolt();
+
+    // create a mock
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    bolt.withMessageDistributor(distributor);
+
+    // create a tuple
+    final long timestamp1 = 100000000L;
+    Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1);
+
+    // execute the bolt
+    TupleWindow tupleWindow = createWindow(tuple1);
+    bolt.execute(tupleWindow);
+
+    // the message should have been extracted from the tuple and passed to the 
MessageDistributor
+    verify(distributor).distribute(any(MessageRoute.class), any());
+  }
+
+
+  /**
+   * If the {@code FlushSignal} tells the bolt to flush, it should flush the 
{@code MessageDistributor}
+   * and emit the {@code ProfileMeasurement} values from all active profiles.
+   */
+  @Test
+  public void testFlushActiveProfiles() throws Exception {
+
+    ProfileBuilderBolt bolt = createBolt();
+
+    // create a mock that returns the profile measurement above
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    
when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
+    bolt.withMessageDistributor(distributor);
+
+    // signal the bolt to flush
+    flushSignal.setFlushNow(true);
+
+    // execute the bolt
+    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
+    TupleWindow tupleWindow = createWindow(tuple1);
+    bolt.execute(tupleWindow);
+
+    // a profile measurement should be emitted by the bolt
+    List<ProfileMeasurement> measurements = 
getProfileMeasurements(outputCollector, 1);
+    assertEquals(1, measurements.size());
+    assertEquals(measurement, measurements.get(0));
+  }
+
+  /**
+   * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be 
emitted.
+   */
+  @Test
+  public void testDoNotFlushActiveProfiles() throws Exception {
+
+    ProfileBuilderBolt bolt = createBolt();
+
+    // create a mock where flush() returns the profile measurement above
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    
when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
+    bolt.withMessageDistributor(distributor);
+
+    // there is no flush signal
+    flushSignal.setFlushNow(false);
+
+    // execute the bolt
+    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
+    TupleWindow tupleWindow = createWindow(tuple1);
+    bolt.execute(tupleWindow);
+
+    // nothing should have been emitted
+    getProfileMeasurements(outputCollector, 0);
+  }
+
+  /**
+   * Expired profiles should be flushed regularly, even if no input telemetry
+   * has been received.
+   */
+  @Test
+  public void testFlushExpiredProfiles() throws Exception {
+
+    ProfileBuilderBolt bolt = createBolt();
+
+    // create a mock where flushExpired() returns the profile measurement above
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    
when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement));
+    bolt.withMessageDistributor(distributor);
+
+    // execute test by flushing expired profiles. this is normally triggered 
by a timer task.
+    bolt.flushExpired();
+
+    // a profile measurement should be emitted by the bolt
+    List<ProfileMeasurement> measurements = 
getProfileMeasurements(outputCollector, 1);
+    assertEquals(1, measurements.size());
+    assertEquals(measurement, measurements.get(0));
+  }
+
+  /**
+   * A {@link ProfileMeasurement} is built for each profile/entity pair.  The 
measurement should be emitted to each
+   * destination defined by the profile. By default, a profile uses both Kafka 
and HBase as destinations.
+   */
+  @Test
+  public void testEmitters() throws Exception {
+
+    // defines the zk configurations accessible from the bolt
+    ProfilerConfigurations configurations = new ProfilerConfigurations();
+    configurations.updateGlobalConfig(Collections.emptyMap());
+
+    // create the bolt with 3 destinations
+    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
+            .withProfileTimeToLive(30, TimeUnit.MINUTES)
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withMaxNumberOfRoutes(Long.MAX_VALUE)
+            .withZookeeperClient(client)
+            .withZookeeperCache(cache)
+            .withEmitter(new TestEmitter("destination1"))
+            .withEmitter(new TestEmitter("destination2"))
+            .withEmitter(new TestEmitter("destination3"))
+            .withProfilerConfigurations(configurations)
+            .withTumblingWindow(new BaseWindowedBolt.Duration(10, 
TimeUnit.MINUTES));
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+
+    // signal the bolt to flush
+    bolt.withFlushSignal(flushSignal);
+    flushSignal.setFlushNow(true);
+
+    // execute the bolt
+    Tuple tuple1 = createTuple("entity", message1, profile1, 
System.currentTimeMillis());
+    TupleWindow window = createWindow(tuple1);
+    bolt.execute(window);
+
+    // validate measurements emitted to each
+    verify(outputCollector, times(1)).emit(eq("destination1"), any());
+    verify(outputCollector, times(1)).emit(eq("destination2"), any());
+    verify(outputCollector, times(1)).emit(eq("destination3"), any());
+  }
+
+  /**
+   * Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
+   *
+   * @param collector The Storm output collector.
+   * @param expected The number of measurements expected.
+   * @return A list of ProfileMeasurement(s).
+   */
+  private List<ProfileMeasurement> getProfileMeasurements(OutputCollector 
collector, int expected) {
+
+    // the 'streamId' is defined by the DestinationHandler being used by the 
bolt
+    final String streamId = emitter.getStreamId();
+
+    // capture the emitted tuple(s)
+    ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(expected))
+            .emit(eq(streamId), argCaptor.capture());
+
+    // return the profile measurements that were emitted
+    return argCaptor.getAllValues()
+            .stream()
+            .map(val -> (ProfileMeasurement) val.get(0))
+            .collect(Collectors.toList());
+  }
+
+  /**
+   * Create a tuple that will contain the message, the entity name, and 
profile definition.
+   * @param entity The entity name
+   * @param message The telemetry message.
+   * @param profile The profile definition.
+   */
+  private Tuple createTuple(String entity, JSONObject message, ProfileConfig 
profile, long timestamp) {
+
+    Tuple tuple = mock(Tuple.class);
+    
when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message);
+    
when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp);
+    
when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity);
+    
when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile);
+
+    return tuple;
+  }
+
+  /**
+   * Create a ProfileBuilderBolt to test.
+   * @return A {@link ProfileBuilderBolt} to test.
+   */
+  private ProfileBuilderBolt createBolt() throws IOException {
+
+    // defines the zk configurations accessible from the bolt
+    ProfilerConfigurations configurations = new ProfilerConfigurations();
+    configurations.updateGlobalConfig(Collections.emptyMap());
+
+    emitter = new HBaseEmitter();
+    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
+            .withProfileTimeToLive(30, TimeUnit.MINUTES)
+            .withMaxNumberOfRoutes(Long.MAX_VALUE)
+            .withZookeeperClient(client)
+            .withZookeeperCache(cache)
+            .withEmitter(emitter)
+            .withProfilerConfigurations(configurations)
+            .withPeriodDuration(1, TimeUnit.MINUTES)
+            .withTumblingWindow(new BaseWindowedBolt.Duration(30, 
TimeUnit.SECONDS));
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+
+    // set the flush signal AFTER calling 'prepare'
+    bolt.withFlushSignal(flushSignal);
+
+    return bolt;
+  }
+
+  /**
+   * Creates a mock TupleWindow containing multiple tuples.
+   * @param tuples The tuples to add to the window.
+   */
+  private TupleWindow createWindow(Tuple... tuples) {
+
+    TupleWindow window = mock(TupleWindow.class);
+    when(window.get()).thenReturn(Arrays.asList(tuples));
+    return window;
+  }
+
+  /**
+   * An implementation for testing purposes only.
+   */
+  private class TestEmitter implements ProfileMeasurementEmitter {
+
+    private String streamId;
+
+    public TestEmitter(String streamId) {
+      this.streamId = streamId;
+    }
+
+    @Override
+    public String getStreamId() {
+      return streamId;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declareStream(getStreamId(), new Fields("measurement"));
+    }
+
+    @Override
+    public void emit(ProfileMeasurement measurement, OutputCollector 
collector) {
+      collector.emit(getStreamId(), new Values(measurement));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java
new file mode 100644
index 0000000..f623d38
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java
@@ -0,0 +1,93 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm;
+
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfileResult;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the ProfileHBaseMapper class.
+ */
+public class ProfileHBaseMapperTest {
+
+  private Tuple tuple;
+  private ProfileHBaseMapper mapper;
+  private ProfileMeasurement measurement;
+  private RowKeyBuilder rowKeyBuilder;
+  private ProfileConfig profile;
+
+  @Before
+  public void setup() {
+    rowKeyBuilder = mock(RowKeyBuilder.class);
+
+    mapper = new ProfileHBaseMapper();
+    mapper.setRowKeyBuilder(rowKeyBuilder);
+
+    profile = new ProfileConfig("profile", "ip_src_addr", new ProfileResult("2 
+ 2"));
+
+    measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withProfileValue(22)
+            .withDefinition(profile);
+
+    // the tuple will contain the original message
+    tuple = mock(Tuple.class);
+    when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement);
+  }
+
+  /**
+   * The mapper should return the expiration for a tuple based on the Profile 
definition.
+   */
+  @Test
+  public void testExpires() throws Exception {
+    final Long expiresDays = 30L;
+    profile.setExpires(expiresDays);
+
+    Optional<Long> actual = mapper.getTTL(tuple);
+    Assert.assertTrue(actual.isPresent());
+    Assert.assertEquals(expiresDays, (Long) 
TimeUnit.MILLISECONDS.toDays(actual.get()));
+  }
+
+  /**
+   * The expiration field is optional within a Profile definition.
+   */
+  @Test
+  public void testExpiresUndefined() throws Exception {
+    // the TTL should not be defined
+    Optional<Long> actual = mapper.getTTL(tuple);
+    Assert.assertFalse(actual.isPresent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
new file mode 100644
index 0000000..93d2ac4
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
@@ -0,0 +1,455 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.DefaultMessageRouter;
+import org.apache.metron.profiler.clock.FixedClockFactory;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.test.bolt.BaseBoltTest;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the ProfileSplitterBolt.
+ */
+public class ProfileSplitterBoltTest extends BaseBoltTest {
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "ip_dst_addr": "10.0.0.20",
+   *   "protocol": "HTTP",
+   *   "timestamp.custom": 2222222222222,
+   *   "timestamp.string": "3333333333333"
+   * }
+   */
+  @Multiline
+  private String input;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "onlyif": "protocol == 'HTTP'",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String profileWithOnlyIfTrue;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "onlyif": "false",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String profileWithOnlyIfFalse;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String profileWithOnlyIfMissing;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "onlyif": "NOT-VALID",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String profileWithOnlyIfInvalid;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp.custom"
+   * }
+   */
+  @Multiline
+  private String profileUsingCustomTimestampField;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp.missing"
+   * }
+   */
+  @Multiline
+  private String profileUsingMissingTimestampField;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp.string"
+   * }
+   */
+  @Multiline
+  private String profileUsingStringTimestampField;
+
+  /**
+   * {
+   *   "profiles": [
+   *   ]
+   * }
+   */
+  @Multiline
+  private String noProfilesDefined;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "'global'",
+   *        "result": "1"
+   *      },
+   *      {
+   *        "profile": "profile2",
+   *        "foreach": "'global'",
+   *        "result": "2"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String twoProfilesDefined;
+
+  private JSONObject message;
+  private long timestamp = 3333333;
+
+  @Before
+  public void setup() throws ParseException {
+
+    // parse the input message
+    JSONParser parser = new JSONParser();
+    message = (JSONObject) parser.parse(input);
+
+    // ensure the tuple returns the expected json message
+    when(tuple.getBinary(0)).thenReturn(input.getBytes());
+  }
+
+  /**
+   * Ensure that a tuple with the correct fields is emitted to downstream bolts
+   * when a profile is defined.
+   */
+  @Test
+  public void testEmitTupleWithOneProfile() throws Exception {
+
+    // setup the bolt and execute a tuple
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // the expected tuple fields
+    String expectedEntity = "10.0.0.1";
+    ProfileConfig expectedConfig = config.getProfiles().get(0);
+    Values expected = new Values(message, timestamp, expectedEntity, 
expectedConfig);
+
+    // a tuple should be emitted for the downstream profile builder
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), eq(expected));
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * If there are two profiles that need the same message, then two tuples 
should
+   * be emitted.  One tuple for each profile.
+   */
+  @Test
+  public void testEmitTupleWithTwoProfiles() throws Exception {
+
+    // setup the bolt and execute a tuple
+    ProfilerConfig config = toProfilerConfig(twoProfilesDefined);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // the expected tuple fields
+    final String expectedEntity = "global";
+    {
+      // a tuple should be emitted for the first profile
+      ProfileConfig profile1 = config.getProfiles().get(0);
+      Values expected = new Values(message, timestamp, expectedEntity, 
profile1);
+      verify(outputCollector, times(1))
+              .emit(eq(tuple), eq(expected));
+    }
+    {
+      // a tuple should be emitted for the second profile
+      ProfileConfig profile2 = config.getProfiles().get(1);
+      Values expected = new Values(message, timestamp, expectedEntity, 
profile2);
+      verify(outputCollector, times(1))
+              .emit(eq(tuple), eq(expected));
+    }
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * No tuples should be emitted, if no profiles are defined.
+   */
+  @Test
+  public void testNoProfilesDefined() throws Exception {
+
+    // setup the bolt and execute a tuple
+    ProfilerConfig config = toProfilerConfig(noProfilesDefined);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // no tuple should be emitted
+    verify(outputCollector, times(0))
+            .emit(any(Tuple.class), any());
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * What happens when a profile's 'onlyif' expression is true?  The message
+   * should be applied to the profile.
+   */
+  @Test
+  public void testOnlyIfTrue() throws Exception {
+
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // a tuple should be emitted for the downstream profile builder
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), any(Values.class));
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * All messages are applied to a profile where 'onlyif' is missing.  A 
profile with no
+   * 'onlyif' is treated the same as if 'onlyif=true'.
+   */
+  @Test
+  public void testOnlyIfMissing() throws Exception {
+
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // a tuple should be emitted for the downstream profile builder
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), any(Values.class));
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * What happens when a profile's 'onlyif' expression is false?  The message
+   * should NOT be applied to the profile.
+   */
+  @Test
+  public void testOnlyIfFalse() throws Exception {
+
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // a tuple should NOT be emitted for the downstream profile builder
+    verify(outputCollector, times(0))
+            .emit(any());
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * The entity associated with a profile is defined with a Stellar 
expression.  That expression
+   * can refer to any field within the message.
+   *
+   * In this case the entity is defined as 'ip_src_addr' which is resolved to 
'10.0.0.1' based on
+   * the data contained within the message.
+   */
+  @Test
+  public void testResolveEntityName() throws Exception {
+
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // expected values
+    String expectedEntity = "10.0.0.1";
+    ProfileConfig expectedConfig = config.getProfiles().get(0);
+    Values expected = new Values(message, timestamp, expectedEntity, 
expectedConfig);
+
+    // a tuple should be emitted for the downstream profile builder
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), eq(expected));
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * What happens when invalid Stella code is used for 'onlyif'?  The invalid 
profile should be ignored.
+   */
+  @Test
+  public void testOnlyIfInvalid() throws Exception {
+
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // a tuple should NOT be emitted for the downstream profile builder
+    verify(outputCollector, times(0))
+            .emit(any(Values.class));
+  }
+
+  @Test
+  public void testWithNullMessage() throws Exception {
+
+    // ensure the tuple returns null to mimic a null message in kafka
+    when(tuple.getBinary(0)).thenReturn(null);
+
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // a tuple should NOT be emitted for the downstream profile builder
+    verify(outputCollector, times(0))
+            .emit(any(Values.class));
+
+  }
+
+  /**
+   * Creates a ProfilerConfig based on a string containing JSON.
+   *
+   * @param configAsJSON The config as JSON.
+   * @return The ProfilerConfig.
+   * @throws Exception
+   */
+  private ProfilerConfig toProfilerConfig(String configAsJSON) throws 
Exception {
+    InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8"));
+    return JSONUtils.INSTANCE.load(in, ProfilerConfig.class);
+  }
+
+  /**
+   * Create a ProfileSplitterBolt to test
+   */
+  private ProfileSplitterBolt createBolt(ProfilerConfig config) throws 
Exception {
+
+    ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
+    bolt.setCuratorFramework(client);
+    bolt.setZKCache(cache);
+    bolt.getConfigurations().updateProfilerConfig(config);
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+
+    // set the clock factory AFTER calling prepare to use the fixed clock 
factory
+    DefaultMessageRouter router = new 
DefaultMessageRouter(bolt.getStellarContext());
+    router.setClockFactory(new FixedClockFactory(timestamp));
+    bolt.setRouter(router);
+
+    return bolt;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
new file mode 100644
index 0000000..70487a0
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
@@ -0,0 +1,124 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.storm.integration;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.UnableToStartException;
+import org.apache.metron.integration.components.ZKServerComponent;
+
+import java.util.Properties;
+
+import static 
org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
+import static 
org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromFile;
+import static 
org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
+import static 
org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromFile;
+import static 
org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
+
+
+/**
+ * Uploads configuration to Zookeeper.
+ */
+public class ConfigUploadComponent implements InMemoryComponent {
+
+  private Properties topologyProperties;
+  private String globalConfiguration;
+  private String profilerConfiguration;
+
+  @Override
+  public void start() throws UnableToStartException {
+    try {
+      upload();
+    } catch (Exception e) {
+      throw new UnableToStartException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    // nothing to do
+  }
+
+  public void update()
+      throws UnableToStartException {
+    try {
+      upload();
+    } catch (Exception e) {
+      throw new UnableToStartException(e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Uploads configuration to Zookeeper.
+   * @throws Exception
+   */
+  private void upload() throws Exception {
+    final String zookeeperUrl = 
topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
+    try(CuratorFramework client = getClient(zookeeperUrl)) {
+      if(client.getState() != CuratorFrameworkState.STARTED) {
+        client.start();
+      }
+      uploadGlobalConfig(client);
+      uploadProfilerConfig(client);
+    }
+  }
+
+  /**
+   * Upload the profiler configuration to Zookeeper.
+   * @param client The zookeeper client.
+   */
+  private void uploadProfilerConfig(CuratorFramework client) throws Exception {
+    if (profilerConfiguration != null) {
+      byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration);
+      if (globalConfig.length > 0) {
+        
writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration),
 client);
+      }
+    }
+  }
+
+  /**
+   * Upload the global configuration to Zookeeper.
+   * @param client The zookeeper client.
+   */
+  private void uploadGlobalConfig(CuratorFramework client) throws Exception {
+    if (globalConfiguration != null) {
+      byte[] globalConfig = readGlobalConfigFromFile(globalConfiguration);
+      if (globalConfig.length > 0) {
+        
writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfiguration), 
client);
+      }
+    }
+  }
+
+  public ConfigUploadComponent withTopologyProperties(Properties 
topologyProperties) {
+    this.topologyProperties = topologyProperties;
+    return this;
+  }
+
+  public ConfigUploadComponent withGlobalConfiguration(String path) {
+    this.globalConfiguration = path;
+    return this;
+  }
+
+  public ConfigUploadComponent withProfilerConfiguration(String path) {
+    this.profilerConfiguration = path;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java
new file mode 100644
index 0000000..17e36e1
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm.integration;
+
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Enables simple creation of telemetry messages for testing.
+ */
+public class MessageBuilder {
+
+  private Map<Object, Object> fields;
+
+  /**
+   * Create a new {@link MessageBuilder}.
+   */
+  public MessageBuilder() {
+    this.fields = new HashMap<>();
+  }
+
+  /**
+   * Adds all of the fields from a message to this message.
+   *
+   * @param prototype The other message that is treated as a prototype.
+   * @return A {@link MessageBuilder}
+   */
+  public MessageBuilder withFields(JSONObject prototype) {
+    prototype.forEach((key, val) -> this.fields.put(key, val));
+    return this;
+  }
+
+  /**
+   * Adds a field to the message.
+   *
+   * @param key The field name.
+   * @param value The field value.
+   * @return A {@link MessageBuilder}
+   */
+  public MessageBuilder withField(String key, Object value) {
+    this.fields.put(key, value);
+    return this;
+  }
+
+  /**
+   * Build the message.
+   *
+   * <p>This should be called after defining all of the message fields.
+   *
+   * @return A {@link MessageBuilder}.
+   */
+  public JSONObject build() {
+    return new JSONObject(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
new file mode 100644
index 0000000..182600a
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -0,0 +1,421 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm.integration;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.integration.BaseIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.UnableToStartException;
+import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.storm.Config;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.code.tempusfugit.temporal.Duration.seconds;
+import static com.google.code.tempusfugit.temporal.Timeout.timeout;
+import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * An integration test of the Profiler topology.
+ */
+public class ProfilerIntegrationTest extends BaseIntegrationTest {
+
+  private static final String TEST_RESOURCES = 
"../../metron-analytics/metron-profiler-storm/src/test";
+  private static final String FLUX_PATH = "src/main/flux/profiler/remote.yaml";
+
+  public static final long startAt = 10;
+  public static final String entity = "10.0.0.1";
+
+  private static final String tableName = "profiler";
+  private static final String columnFamily = "P";
+  private static final String inputTopic = Constants.INDEXING_TOPIC;
+  private static final String outputTopic = "profiles";
+  private static final int saltDivisor = 10;
+
+  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
+  private static final long windowDurationMillis = 
TimeUnit.SECONDS.toMillis(5);
+  private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(10);
+  private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(15);
+  private static final long maxRoutesPerBolt = 100000;
+
+  private static ColumnBuilder columnBuilder;
+  private static ZKServerComponent zkComponent;
+  private static FluxTopologyComponent fluxComponent;
+  private static KafkaComponent kafkaComponent;
+  private static ConfigUploadComponent configUploadComponent;
+  private static ComponentRunner runner;
+  private static MockHTable profilerTable;
+
+  private static String message1;
+  private static String message2;
+  private static String message3;
+
+  /**
+   * [
+   *    org.apache.metron.profiler.ProfileMeasurement,
+   *    org.apache.metron.profiler.ProfilePeriod,
+   *    org.apache.metron.common.configuration.profiler.ProfileResult,
+   *    
org.apache.metron.common.configuration.profiler.ProfileResultExpressions,
+   *    
org.apache.metron.common.configuration.profiler.ProfileTriageExpressions,
+   *    org.apache.metron.common.configuration.profiler.ProfilerConfig,
+   *    org.apache.metron.common.configuration.profiler.ProfileConfig,
+   *    org.json.simple.JSONObject,
+   *    java.util.LinkedHashMap,
+   *    org.apache.metron.statistics.OnlineStatisticsProvider
+   *  ]
+   */
+  @Multiline
+  private static String kryoSerializers;
+
+  /**
+   * The Profiler can generate profiles based on processing time.  With 
processing time,
+   * the Profiler builds profiles based on when the telemetry was processed.
+   *
+   * <p>Not defining a 'timestampField' within the Profiler configuration 
tells the Profiler
+   * to use processing time.
+   */
+  @Test
+  public void testProcessingTime() throws Exception {
+
+    // upload the config to zookeeper
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+
+    // the messages that will be applied to the profile
+    kafkaComponent.writeMessages(inputTopic, message1);
+    kafkaComponent.writeMessages(inputTopic, message2);
+    kafkaComponent.writeMessages(inputTopic, message3);
+
+    // storm needs at least one message to close its event window
+    int attempt = 0;
+    while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
+
+      // sleep, at least beyond the current window
+      Thread.sleep(windowDurationMillis + windowLagMillis);
+
+      // send another message to help close the current event window
+      kafkaComponent.writeMessages(inputTopic, message2);
+    }
+
+    // validate what was flushed
+    List<Integer> actuals = read(
+            profilerTable.getPutLog(),
+            columnFamily,
+            columnBuilder.getColumnQualifier("value"),
+            Integer.class);
+    assertEquals(1, actuals.size());
+    assertTrue(actuals.get(0) >= 3);
+  }
+
+  /**
+   * The Profiler can generate profiles using event time.  With event time 
processing,
+   * the Profiler uses timestamps contained in the source telemetry.
+   *
+   * <p>Defining a 'timestampField' within the Profiler configuration tells 
the Profiler
+   * from which field the timestamp should be extracted.
+   */
+  @Test
+  public void testEventTime() throws Exception {
+
+    // upload the profiler config to zookeeper
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(inputTopic, message1);
+    kafkaComponent.writeMessages(inputTopic, message2);
+    kafkaComponent.writeMessages(inputTopic, message3);
+
+    // wait until the profile is flushed
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
+
+    List<Put> puts = profilerTable.getPutLog();
+    assertEquals(1, puts.size());
+
+    // inspect the row key to ensure the profiler used event time correctly.  
the timestamp
+    // embedded in the row key should match those in the source telemetry
+    byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, 
startAt);
+    byte[] actualRowKey = puts.get(0).getRow();
+    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
+  }
+
+  /**
+   * The result produced by a Profile has to be serializable within Storm. If 
the result is not
+   * serializable the topology will crash and burn.
+   *
+   * This test ensures that if a profile returns a STATS object created using 
the STATS_INIT and
+   * STATS_ADD functions, that it can be correctly serialized and persisted.
+   */
+  @Test
+  public void testProfileWithStatsObject() throws Exception {
+
+    // upload the profiler config to zookeeper
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(inputTopic, message1);
+    kafkaComponent.writeMessages(inputTopic, message2);
+    kafkaComponent.writeMessages(inputTopic, message3);
+
+    // wait until the profile is flushed
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
+
+    // ensure that a value was persisted in HBase
+    List<Put> puts = profilerTable.getPutLog();
+    assertEquals(1, puts.size());
+
+    // generate the expected row key. only the profile name, entity, and 
period are used to generate the row key
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile-with-stats")
+            .withEntity("global")
+            .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS);
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
+    byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement);
+
+    // ensure the correct row key was generated
+    byte[] actualRowKey = puts.get(0).getRow();
+    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
+  }
+
+  /**
+   * Generates an error message for if the byte comparison fails.
+   *
+   * @param expected The expected value.
+   * @param actual The actual value.
+   * @return
+   * @throws UnsupportedEncodingException
+   */
+  private String failMessage(byte[] expected, byte[] actual) throws 
UnsupportedEncodingException {
+    return String.format("expected '%s', got '%s'",
+              new String(expected, "UTF-8"),
+              new String(actual, "UTF-8"));
+  }
+
+  /**
+   * Generates the expected row key.
+   *
+   * @param profileName The name of the profile.
+   * @param entity The entity.
+   * @param whenMillis A timestamp in epoch milliseconds.
+   * @return A row key.
+   */
+  private byte[] generateExpectedRowKey(String profileName, String entity, 
long whenMillis) {
+
+    // only the profile name, entity, and period are used to generate the row 
key
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName(profileName)
+            .withEntity(entity)
+            .withPeriod(whenMillis, periodDurationMillis, 
TimeUnit.MILLISECONDS);
+
+    // build the row key
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
+    return rowKeyBuilder.rowKey(measurement);
+  }
+
+  /**
+   * Reads a value written by the Profiler.
+   *
+   * @param family The column family.
+   * @param qualifier The column qualifier.
+   * @param clazz The expected type of the value.
+   * @param <T> The expected type of the value.
+   * @return The value written by the Profiler.
+   */
+  private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, 
Class<T> clazz) {
+    List<T> results = new ArrayList<>();
+
+    for(Put put: puts) {
+      List<Cell> cells = put.get(Bytes.toBytes(family), qualifier);
+      for(Cell cell : cells) {
+        T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
+        results.add(value);
+      }
+    }
+
+    return results;
+  }
+
+  @BeforeClass
+  public static void setupBeforeClass() throws UnableToStartException {
+
+    // create some messages that contain a timestamp - a really old timestamp; 
close to 1970
+    message1 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt)
+            .build()
+            .toJSONString();
+
+    message2 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt + 100)
+            .build()
+            .toJSONString();
+
+    message3 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt + (windowDurationMillis * 2))
+            .build()
+            .toJSONString();
+
+    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+    // storm topology properties
+    final Properties topologyProperties = new Properties() {{
+
+      // storm settings
+      setProperty("profiler.workers", "1");
+      setProperty("profiler.executors", "0");
+      setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]");
+      setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60");
+      setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000");
+
+      // ensure tuples are serialized during the test, otherwise serialization 
problems
+      // will not be found until the topology is run on a cluster with 
multiple workers
+      setProperty(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, "true");
+      setProperty(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, "false");
+      setProperty(Config.TOPOLOGY_KRYO_REGISTER, kryoSerializers);
+
+      // kafka settings
+      setProperty("profiler.input.topic", inputTopic);
+      setProperty("profiler.output.topic", outputTopic);
+      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+      setProperty("kafka.security.protocol", "PLAINTEXT");
+
+      // hbase settings
+      setProperty("profiler.hbase.salt.divisor", 
Integer.toString(saltDivisor));
+      setProperty("profiler.hbase.table", tableName);
+      setProperty("profiler.hbase.column.family", columnFamily);
+      setProperty("profiler.hbase.batch", "10");
+      setProperty("profiler.hbase.flush.interval.seconds", "1");
+      setProperty("hbase.provider.impl", "" + 
MockHBaseTableProvider.class.getName());
+
+      // profile settings
+      setProperty("profiler.period.duration", 
Long.toString(periodDurationMillis));
+      setProperty("profiler.period.duration.units", "MILLISECONDS");
+      setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis));
+      setProperty("profiler.ttl.units", "MILLISECONDS");
+      setProperty("profiler.window.duration", 
Long.toString(windowDurationMillis));
+      setProperty("profiler.window.duration.units", "MILLISECONDS");
+      setProperty("profiler.window.lag", Long.toString(windowLagMillis));
+      setProperty("profiler.window.lag.units", "MILLISECONDS");
+      setProperty("profiler.max.routes.per.bolt", 
Long.toString(maxRoutesPerBolt));
+    }};
+
+    // create the mock table
+    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
+
+    zkComponent = getZKServerComponent(topologyProperties);
+
+    // create the input and output topics
+    kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
+            new KafkaComponent.Topic(inputTopic, 1),
+            new KafkaComponent.Topic(outputTopic, 1)));
+
+    // upload profiler configuration to zookeeper
+    configUploadComponent = new ConfigUploadComponent()
+            .withTopologyProperties(topologyProperties);
+
+    // load flux definition for the profiler topology
+    fluxComponent = new FluxTopologyComponent.Builder()
+            .withTopologyLocation(new File(FLUX_PATH))
+            .withTopologyName("profiler")
+            .withTopologyProperties(topologyProperties)
+            .build();
+
+    // start all components
+    runner = new ComponentRunner.Builder()
+            .withComponent("zk",zkComponent)
+            .withComponent("kafka", kafkaComponent)
+            .withComponent("config", configUploadComponent)
+            .withComponent("storm", fluxComponent)
+            .withMillisecondsBetweenAttempts(15000)
+            .withNumRetries(10)
+            .withCustomShutdownOrder(new String[] 
{"storm","config","kafka","zk"})
+            .build();
+    runner.start();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    MockHBaseTableProvider.clear();
+    if (runner != null) {
+      runner.stop();
+    }
+  }
+
+  @Before
+  public void setup() {
+    // create the mock table
+    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    MockHBaseTableProvider.clear();
+    profilerTable.clear();
+    if (runner != null) {
+      runner.reset();
+    }
+  }
+
+  /**
+   * Uploads config values to Zookeeper.
+   * @param path The path on the local filesystem to the config values.
+   * @throws Exception
+   */
+  public void uploadConfig(String path) throws Exception {
+    configUploadComponent
+            .withGlobalConfiguration(path)
+            .withProfilerConfiguration(path)
+            .update();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties 
b/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties
new file mode 100644
index 0000000..541f368
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+#
+
+# Root logger option
+log4j.rootLogger=ERROR, stdout
+
+# Direct log messages to stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
+log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.stdout.filter.1.StringToMatch=Connection timed out
+log4j.appender.stdout.filter.1.AcceptOnMatch=false
+log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.stdout.filter.2.StringToMatch=Background
+log4j.appender.stdout.filter.2.AcceptOnMatch=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/.gitignore
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/.gitignore 
b/metron-analytics/metron-profiler/.gitignore
deleted file mode 100644
index df1a13b..0000000
--- a/metron-analytics/metron-profiler/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/logs
\ No newline at end of file

Reply via email to