http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh 
b/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh
deleted file mode 100644
index 6ec78f5..0000000
--- 
a/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh
+++ /dev/null
@@ -1,22 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-METRON_VERSION=${project.version}
-METRON_HOME=/usr/metron/$METRON_VERSION
-TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote 
$METRON_HOME/flux/profiler/remote.yaml --filter 
$METRON_HOME/config/profiler.properties

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
 
b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
deleted file mode 100644
index 9d727a3..0000000
--- 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "event-time-test",
-      "foreach": "ip_src_addr",
-      "init":   { "counter": "0" },
-      "update": { "counter": "counter + 1" },
-      "result": "counter"
-    }
-  ],
-  "timestampField": "timestamp"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
 
b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
deleted file mode 100644
index e75ec0f..0000000
--- 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "processing-time-test",
-      "foreach": "ip_src_addr",
-      "init":   { "counter": "0" },
-      "update": { "counter": "counter + 1" },
-      "result": "counter"
-    }
-  ]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json
 
b/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json
deleted file mode 100644
index 083e73f..0000000
--- 
a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "profile-with-stats",
-      "foreach": "'global'",
-      "init":   { "stats": "STATS_INIT()" },
-      "update": { "stats": "STATS_ADD(stats, 1)" },
-      "result": "stats"
-    }
-  ],
-  "timestampField": "timestamp"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
deleted file mode 100644
index b8949c5..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests the {@code FixedFrequencyFlushSignal} class.
- */
-public class FixedFrequencyFlushSignalTest {
-
-  @Test
-  public void testSignalFlush() {
-
-    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
-
-    // not time to flush yet
-    assertFalse(signal.isTimeToFlush());
-
-    // advance time
-    signal.update(5000);
-
-    // not time to flush yet
-    assertFalse(signal.isTimeToFlush());
-
-    // advance time
-    signal.update(7000);
-
-    // time to flush
-    assertTrue(signal.isTimeToFlush());
-  }
-
-  @Test
-  public void testOutOfOrderTimestamps() {
-    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
-
-    // advance time, out-of-order
-    signal.update(5000);
-    signal.update(1000);
-    signal.update(7000);
-    signal.update(3000);
-
-    // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even 
out-of-order), then it should signal a flush
-    assertTrue(signal.isTimeToFlush());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testNegativeFrequency() {
-    new FixedFrequencyFlushSignal(-1000);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
deleted file mode 100644
index 35ca4d9..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests the HBaseEmitter class.
- */
-public class HBaseEmitterTest {
-
-  /**
-   * {
-   *   "profile": "profile-one",
-   *   "foreach": "ip_src_addr",
-   *   "init":   { "x": "0" },
-   *   "update": { "x": "x + 1" },
-   *   "result": "x"
-   * }
-   */
-  @Multiline
-  private String profileDefinition;
-
-  private HBaseEmitter emitter;
-  private ProfileConfig profile;
-  private OutputCollector collector;
-
-  @Before
-  public void setup() throws Exception {
-    emitter = new HBaseEmitter();
-    profile = createDefinition(profileDefinition);
-    collector = Mockito.mock(OutputCollector.class);
-  }
-
-  /**
-   * The handler should emit a message containing the result of executing
-   * the 'result/profile' expression.
-   */
-  @Test
-  public void testEmit() throws Exception {
-
-    // create a measurement that has triage values
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withDefinition(profile)
-            .withProfileValue(22);
-
-    // execute the test
-    emitter.emit(measurement, collector);
-
-    // the measurement should be emitted as-is
-    ProfileMeasurement actual = expectMeasurement(emitter, collector);
-    assertEquals(measurement, actual);
-  }
-
-  /**
-   * Verifies that the emitter does emit a {@code ProfileMeasurement}.
-   *
-   * @return The {@code ProfileMeasurement} that was emitted
-   */
-  private ProfileMeasurement expectMeasurement(HBaseEmitter hbaseEmitter, 
OutputCollector collector) {
-
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(hbaseEmitter.getStreamId()), 
arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof ProfileMeasurement);
-    return (ProfileMeasurement) values.get(0);
-  }
-
-  /**
-   * Creates a profile definition based on a string of JSON.
-   * @param json The string of JSON.
-   */
-  private ProfileConfig createDefinition(String json) throws IOException {
-    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
deleted file mode 100644
index 95a2d29..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.statistics.OnlineStatisticsProvider;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests the KafkaDestinationHandler.
- */
-public class KafkaEmitterTest {
-
-  /**
-   * {
-   *   "profile": "profile-one-destination",
-   *   "foreach": "ip_src_addr",
-   *   "init":   { "x": "0" },
-   *   "update": { "x": "x + 1" },
-   *   "result": {
-   *      "profile": "x",
-   *      "triage": {
-   *        "value": "x"
-   *       }
-   *    }
-   * }
-   */
-  @Multiline
-  private String profileDefinitionWithTriage;
-
-  private KafkaEmitter kafkaEmitter;
-  private ProfileConfig profile;
-  private OutputCollector collector;
-
-  @Before
-  public void setup() throws Exception {
-    kafkaEmitter = new KafkaEmitter();
-    profile = createDefinition(profileDefinitionWithTriage);
-    collector = Mockito.mock(OutputCollector.class);
-  }
-
-  /**
-   * The handler should emit a message when a result/triage expression(s) has 
been defined.
-   */
-  @Test
-  public void testEmit() throws Exception {
-
-    // create a measurement that has triage values
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withDefinition(profile)
-            .withTriageValues(Collections.singletonMap("triage-key", 
"triage-value"));
-
-    // execute the test
-    kafkaEmitter.emit(measurement, collector);
-
-    // a message should be emitted
-    verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), any());
-  }
-
-  /**
-   * The handler should NOT emit a message when there is NO result/triage 
value(s).
-   */
-  @Test
-  public void testDoNotEmit() throws Exception {
-
-    // create a measurement with NO triage values
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withDefinition(profile);
-
-    // execute the test
-    kafkaEmitter.emit(measurement, collector);
-
-    // a message should NOT be emitted
-    verify(collector, times(0)).emit(eq(kafkaEmitter.getStreamId()), any());
-  }
-
-  /**
-   * Validate that the message generated for Kafka should include the triage 
value.
-   */
-  @Test
-  public void testTriageValueInMessage() throws Exception {
-
-    // create a measurement that has triage values
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withDefinition(profile)
-            .withProfileName(profile.getProfile())
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", 
"triage-value"));
-
-    // execute the test
-    kafkaEmitter.emit(measurement, collector);
-    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
-
-    // validate the core parts of the message
-    assertEquals(measurement.getProfileName(),                    
actual.get("profile"));
-    assertEquals(measurement.getEntity(),                         
actual.get("entity"));
-    assertEquals(measurement.getPeriod().getPeriod(),             
actual.get("period"));
-    assertEquals(measurement.getPeriod().getStartTimeMillis(),    
actual.get("period.start"));
-    assertEquals(measurement.getPeriod().getEndTimeMillis(),      
actual.get("period.end"));
-    assertEquals("profiler",                                      
actual.get("source.type"));
-    assertNotNull(actual.get("timestamp"));
-
-    // validate that the triage value has been added
-    assertEquals(measurement.getTriageValues().get("triage-key"), 
actual.get("triage-key"));
-  }
-
-  /**
-   * Validate that the message generated for Kafka can include multiple triage 
values.
-   */
-  @Test
-  public void testMultipleTriageValueInMessage() throws Exception {
-
-    // multiple triage values have been defined
-    Map<String, Object> triageValues = ImmutableMap.of(
-            "x", 2,
-            "y", "4",
-            "z", 6.0);
-
-    // create a measurement that has multiple triage values
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withDefinition(profile)
-            .withProfileName(profile.getProfile())
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(triageValues);
-
-    // execute the test
-    kafkaEmitter.emit(measurement, collector);
-    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
-
-    // validate that ALL of the triage values have been added
-    assertEquals(measurement.getTriageValues().get("x"), actual.get("x"));
-    assertEquals(measurement.getTriageValues().get("y"), actual.get("y"));
-    assertEquals(measurement.getTriageValues().get("z"), actual.get("z"));
-  }
-
-  /**
-   * Values destined for Kafka can only be serialized into text, which limits 
the types of values
-   * that can result from a triage expression.  Only primitive types and 
Strings are allowed.
-   */
-  @Test
-  public void testInvalidType() throws Exception {
-
-    // create one invalid expression and one valid expression
-    Map<String, Object> triageValues = ImmutableMap.of(
-            "invalid", new OnlineStatisticsProvider(),
-            "valid", 4);
-
-    // create the measurement with a Map as a triage value; this is not allowed
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withDefinition(profile)
-            .withProfileName(profile.getProfile())
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(triageValues);
-
-    // execute the test
-    kafkaEmitter.emit(measurement, collector);
-    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
-
-    // validate the core parts of the message still exist
-    assertEquals(measurement.getProfileName(),                    
actual.get("profile"));
-    assertEquals(measurement.getEntity(),                         
actual.get("entity"));
-    assertEquals(measurement.getPeriod().getPeriod(),             
actual.get("period"));
-    assertEquals(measurement.getPeriod().getStartTimeMillis(),    
actual.get("period.start"));
-    assertEquals(measurement.getPeriod().getEndTimeMillis(),      
actual.get("period.end"));
-    assertEquals("profiler",                                      
actual.get("source.type"));
-
-    // the invalid expression should be skipped and not included in the message
-    assertFalse(actual.containsKey("invalid"));
-
-    // but the valid expression should still be there
-    assertEquals(triageValues.get("valid"), actual.get("valid"));
-  }
-
-  /**
-   * Values destined for Kafka can only be serialized into text, which limits 
the types of values
-   * that can result from a triage expression.  Only primitive types and 
Strings are allowed.
-   */
-  @Test
-  public void testIntegerIsValidType() throws Exception {
-
-    // create a measurement with a triage value that is an integer
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withDefinition(profile)
-            .withProfileName(profile.getProfile())
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", 123));
-
-    // execute the test
-    kafkaEmitter.emit(measurement, collector);
-    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
-
-    // the triage expression is valid
-    assertEquals(measurement.getTriageValues().get("triage-key"), 
actual.get("triage-key"));
-  }
-
-  /**
-   * Values destined for Kafka can only be serialized into text, which limits 
the types of values
-   * that can result from a triage expression.  Only primitive types and 
Strings are allowed.
-   */
-  @Test
-  public void testStringIsValidType() throws Exception {
-
-    // create a measurement with a triage value that is a string
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withDefinition(profile)
-            .withProfileName(profile.getProfile())
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", "value"));
-
-    // execute the test
-    kafkaEmitter.emit(measurement, collector);
-    JSONObject actual = expectJsonObject(kafkaEmitter, collector);
-
-    // the triage expression is valid
-    assertEquals(measurement.getTriageValues().get("triage-key"), 
actual.get("triage-key"));
-  }
-
-  /**
-   * Verifies that the KafkaEmitter does emit a JSONObject.
-   * @return The JSONObject that was emitted
-   */
-  private JSONObject expectJsonObject(KafkaEmitter kafkaEmitter, 
OutputCollector collector) {
-
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), 
arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
-    return (JSONObject) values.get(0);
-  }
-
-  /**
-   * Creates a profile definition based on a string of JSON.
-   * @param json The string of JSON.
-   */
-  private ProfileConfig createDefinition(String json) throws IOException {
-    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
deleted file mode 100644
index 3132ae6..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
-import org.apache.metron.profiler.MessageDistributor;
-import org.apache.metron.profiler.MessageRoute;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.integration.MessageBuilder;
-import org.apache.metron.test.bolt.BaseBoltTest;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseWindowedBolt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.windowing.TupleWindow;
-import org.json.simple.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests the ProfileBuilderBolt.
- */
-public class ProfileBuilderBoltTest extends BaseBoltTest {
-
-  private JSONObject message1;
-  private JSONObject message2;
-  private ProfileConfig profile1;
-  private ProfileConfig profile2;
-  private ProfileMeasurementEmitter emitter;
-  private ManualFlushSignal flushSignal;
-  private ProfileMeasurement measurement;
-
-  @Before
-  public void setup() throws Exception {
-
-    message1 = new MessageBuilder()
-            .withField("ip_src_addr", "10.0.0.1")
-            .withField("value", "22")
-            .build();
-
-    message2 = new MessageBuilder()
-            .withField("ip_src_addr", "10.0.0.2")
-            .withField("value", "22")
-            .build();
-
-    profile1 = new ProfileConfig()
-            .withProfile("profile1")
-            .withForeach("ip_src_addr")
-            .withInit("x", "0")
-            .withUpdate("x", "x + 1")
-            .withResult("x");
-
-    profile2 = new ProfileConfig()
-            .withProfile("profile2")
-            .withForeach("ip_src_addr")
-            .withInit(Collections.singletonMap("x", "0"))
-            .withUpdate(Collections.singletonMap("x", "x + 1"))
-            .withResult("x");
-
-    measurement = new ProfileMeasurement()
-            .withEntity("entity1")
-            .withProfileName("profile1")
-            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
-            .withProfileValue(22);
-
-    flushSignal = new ManualFlushSignal();
-    flushSignal.setFlushNow(false);
-  }
-
-  /**
-   * The bolt should extract a message and timestamp from a tuple and
-   * pass that to a {@code MessageDistributor}.
-   */
-  @Test
-  public void testExtractMessage() throws Exception {
-
-    ProfileBuilderBolt bolt = createBolt();
-
-    // create a mock
-    MessageDistributor distributor = mock(MessageDistributor.class);
-    bolt.withMessageDistributor(distributor);
-
-    // create a tuple
-    final long timestamp1 = 100000000L;
-    Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1);
-
-    // execute the bolt
-    TupleWindow tupleWindow = createWindow(tuple1);
-    bolt.execute(tupleWindow);
-
-    // the message should have been extracted from the tuple and passed to the 
MessageDistributor
-    verify(distributor).distribute(any(MessageRoute.class), any());
-  }
-
-
-  /**
-   * If the {@code FlushSignal} tells the bolt to flush, it should flush the 
{@code MessageDistributor}
-   * and emit the {@code ProfileMeasurement} values from all active profiles.
-   */
-  @Test
-  public void testFlushActiveProfiles() throws Exception {
-
-    ProfileBuilderBolt bolt = createBolt();
-
-    // create a mock that returns the profile measurement above
-    MessageDistributor distributor = mock(MessageDistributor.class);
-    
when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
-    bolt.withMessageDistributor(distributor);
-
-    // signal the bolt to flush
-    flushSignal.setFlushNow(true);
-
-    // execute the bolt
-    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
-    TupleWindow tupleWindow = createWindow(tuple1);
-    bolt.execute(tupleWindow);
-
-    // a profile measurement should be emitted by the bolt
-    List<ProfileMeasurement> measurements = 
getProfileMeasurements(outputCollector, 1);
-    assertEquals(1, measurements.size());
-    assertEquals(measurement, measurements.get(0));
-  }
-
-  /**
-   * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be 
emitted.
-   */
-  @Test
-  public void testDoNotFlushActiveProfiles() throws Exception {
-
-    ProfileBuilderBolt bolt = createBolt();
-
-    // create a mock where flush() returns the profile measurement above
-    MessageDistributor distributor = mock(MessageDistributor.class);
-    
when(distributor.flush()).thenReturn(Collections.singletonList(measurement));
-    bolt.withMessageDistributor(distributor);
-
-    // there is no flush signal
-    flushSignal.setFlushNow(false);
-
-    // execute the bolt
-    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
-    TupleWindow tupleWindow = createWindow(tuple1);
-    bolt.execute(tupleWindow);
-
-    // nothing should have been emitted
-    getProfileMeasurements(outputCollector, 0);
-  }
-
-  /**
-   * Expired profiles should be flushed regularly, even if no input telemetry
-   * has been received.
-   */
-  @Test
-  public void testFlushExpiredProfiles() throws Exception {
-
-    ProfileBuilderBolt bolt = createBolt();
-
-    // create a mock where flushExpired() returns the profile measurement above
-    MessageDistributor distributor = mock(MessageDistributor.class);
-    
when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement));
-    bolt.withMessageDistributor(distributor);
-
-    // execute test by flushing expired profiles. this is normally triggered 
by a timer task.
-    bolt.flushExpired();
-
-    // a profile measurement should be emitted by the bolt
-    List<ProfileMeasurement> measurements = 
getProfileMeasurements(outputCollector, 1);
-    assertEquals(1, measurements.size());
-    assertEquals(measurement, measurements.get(0));
-  }
-
-  /**
-   * A {@link ProfileMeasurement} is built for each profile/entity pair.  The 
measurement should be emitted to each
-   * destination defined by the profile. By default, a profile uses both Kafka 
and HBase as destinations.
-   */
-  @Test
-  public void testEmitters() throws Exception {
-
-    // defines the zk configurations accessible from the bolt
-    ProfilerConfigurations configurations = new ProfilerConfigurations();
-    configurations.updateGlobalConfig(Collections.emptyMap());
-
-    // create the bolt with 3 destinations
-    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
-            .withProfileTimeToLive(30, TimeUnit.MINUTES)
-            .withPeriodDuration(10, TimeUnit.MINUTES)
-            .withMaxNumberOfRoutes(Long.MAX_VALUE)
-            .withZookeeperClient(client)
-            .withZookeeperCache(cache)
-            .withEmitter(new TestEmitter("destination1"))
-            .withEmitter(new TestEmitter("destination2"))
-            .withEmitter(new TestEmitter("destination3"))
-            .withProfilerConfigurations(configurations)
-            .withTumblingWindow(new BaseWindowedBolt.Duration(10, 
TimeUnit.MINUTES));
-    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-
-    // signal the bolt to flush
-    bolt.withFlushSignal(flushSignal);
-    flushSignal.setFlushNow(true);
-
-    // execute the bolt
-    Tuple tuple1 = createTuple("entity", message1, profile1, 
System.currentTimeMillis());
-    TupleWindow window = createWindow(tuple1);
-    bolt.execute(window);
-
-    // validate measurements emitted to each
-    verify(outputCollector, times(1)).emit(eq("destination1"), any());
-    verify(outputCollector, times(1)).emit(eq("destination2"), any());
-    verify(outputCollector, times(1)).emit(eq("destination3"), any());
-  }
-
-  /**
-   * Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
-   *
-   * @param collector The Storm output collector.
-   * @param expected The number of measurements expected.
-   * @return A list of ProfileMeasurement(s).
-   */
-  private List<ProfileMeasurement> getProfileMeasurements(OutputCollector 
collector, int expected) {
-
-    // the 'streamId' is defined by the DestinationHandler being used by the 
bolt
-    final String streamId = emitter.getStreamId();
-
-    // capture the emitted tuple(s)
-    ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(expected))
-            .emit(eq(streamId), argCaptor.capture());
-
-    // return the profile measurements that were emitted
-    return argCaptor.getAllValues()
-            .stream()
-            .map(val -> (ProfileMeasurement) val.get(0))
-            .collect(Collectors.toList());
-  }
-
-  /**
-   * Create a tuple that will contain the message, the entity name, and 
profile definition.
-   * @param entity The entity name
-   * @param message The telemetry message.
-   * @param profile The profile definition.
-   */
-  private Tuple createTuple(String entity, JSONObject message, ProfileConfig 
profile, long timestamp) {
-
-    Tuple tuple = mock(Tuple.class);
-    
when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message);
-    
when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp);
-    
when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity);
-    
when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile);
-
-    return tuple;
-  }
-
-  /**
-   * Create a ProfileBuilderBolt to test.
-   * @return A {@link ProfileBuilderBolt} to test.
-   */
-  private ProfileBuilderBolt createBolt() throws IOException {
-
-    // defines the zk configurations accessible from the bolt
-    ProfilerConfigurations configurations = new ProfilerConfigurations();
-    configurations.updateGlobalConfig(Collections.emptyMap());
-
-    emitter = new HBaseEmitter();
-    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
-            .withProfileTimeToLive(30, TimeUnit.MINUTES)
-            .withMaxNumberOfRoutes(Long.MAX_VALUE)
-            .withZookeeperClient(client)
-            .withZookeeperCache(cache)
-            .withEmitter(emitter)
-            .withProfilerConfigurations(configurations)
-            .withPeriodDuration(1, TimeUnit.MINUTES)
-            .withTumblingWindow(new BaseWindowedBolt.Duration(30, 
TimeUnit.SECONDS));
-    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-
-    // set the flush signal AFTER calling 'prepare'
-    bolt.withFlushSignal(flushSignal);
-
-    return bolt;
-  }
-
-  /**
-   * Creates a mock TupleWindow containing multiple tuples.
-   * @param tuples The tuples to add to the window.
-   */
-  private TupleWindow createWindow(Tuple... tuples) {
-
-    TupleWindow window = mock(TupleWindow.class);
-    when(window.get()).thenReturn(Arrays.asList(tuples));
-    return window;
-  }
-
-  /**
-   * An implementation for testing purposes only.
-   */
-  private class TestEmitter implements ProfileMeasurementEmitter {
-
-    private String streamId;
-
-    public TestEmitter(String streamId) {
-      this.streamId = streamId;
-    }
-
-    @Override
-    public String getStreamId() {
-      return streamId;
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declareStream(getStreamId(), new Fields("measurement"));
-    }
-
-    @Override
-    public void emit(ProfileMeasurement measurement, OutputCollector 
collector) {
-      collector.emit(getStreamId(), new Values(measurement));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
deleted file mode 100644
index 04c774c..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.configuration.profiler.ProfileResult;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.storm.tuple.Tuple;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests the ProfileHBaseMapper class.
- */
-public class ProfileHBaseMapperTest {
-
-  private Tuple tuple;
-  private ProfileHBaseMapper mapper;
-  private ProfileMeasurement measurement;
-  private RowKeyBuilder rowKeyBuilder;
-  private ProfileConfig profile;
-
-  @Before
-  public void setup() {
-    rowKeyBuilder = mock(RowKeyBuilder.class);
-
-    mapper = new ProfileHBaseMapper();
-    mapper.setRowKeyBuilder(rowKeyBuilder);
-
-    profile = new ProfileConfig("profile", "ip_src_addr", new ProfileResult("2 
+ 2"));
-
-    measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withProfileValue(22)
-            .withDefinition(profile);
-
-    // the tuple will contain the original message
-    tuple = mock(Tuple.class);
-    when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement);
-  }
-
-  /**
-   * The mapper should return the expiration for a tuple based on the Profile 
definition.
-   */
-  @Test
-  public void testExpires() throws Exception {
-    final Long expiresDays = 30L;
-    profile.setExpires(expiresDays);
-
-    Optional<Long> actual = mapper.getTTL(tuple);
-    Assert.assertTrue(actual.isPresent());
-    Assert.assertEquals(expiresDays, (Long) 
TimeUnit.MILLISECONDS.toDays(actual.get()));
-  }
-
-  /**
-   * The expiration field is optional within a Profile definition.
-   */
-  @Test
-  public void testExpiresUndefined() throws Exception {
-    // the TTL should not be defined
-    Optional<Long> actual = mapper.getTTL(tuple);
-    Assert.assertFalse(actual.isPresent());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
deleted file mode 100644
index c879b4b..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ /dev/null
@@ -1,455 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.configuration.profiler.ProfilerConfig;
-import org.apache.metron.profiler.DefaultMessageRouter;
-import org.apache.metron.profiler.clock.FixedClockFactory;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.test.bolt.BaseBoltTest;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests the ProfileSplitterBolt.
- */
-public class ProfileSplitterBoltTest extends BaseBoltTest {
-
-  /**
-   * {
-   *   "ip_src_addr": "10.0.0.1",
-   *   "ip_dst_addr": "10.0.0.20",
-   *   "protocol": "HTTP",
-   *   "timestamp.custom": 2222222222222,
-   *   "timestamp.string": "3333333333333"
-   * }
-   */
-  @Multiline
-  private String input;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "test",
-   *        "foreach": "ip_src_addr",
-   *        "onlyif": "protocol == 'HTTP'",
-   *        "init": {},
-   *        "update": {},
-   *        "result": "2"
-   *      }
-   *   ]
-   * }
-   */
-  @Multiline
-  private String profileWithOnlyIfTrue;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "test",
-   *        "foreach": "ip_src_addr",
-   *        "onlyif": "false",
-   *        "init": {},
-   *        "update": {},
-   *        "result": "2"
-   *      }
-   *   ]
-   * }
-   */
-  @Multiline
-  private String profileWithOnlyIfFalse;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "test",
-   *        "foreach": "ip_src_addr",
-   *        "init": {},
-   *        "update": {},
-   *        "result": "2"
-   *      }
-   *   ]
-   * }
-   */
-  @Multiline
-  private String profileWithOnlyIfMissing;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "test",
-   *        "foreach": "ip_src_addr",
-   *        "onlyif": "NOT-VALID",
-   *        "init": {},
-   *        "update": {},
-   *        "result": "2"
-   *      }
-   *   ]
-   * }
-   */
-  @Multiline
-  private String profileWithOnlyIfInvalid;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "test",
-   *        "foreach": "ip_src_addr",
-   *        "init": {},
-   *        "update": {},
-   *        "result": "2"
-   *      }
-   *   ],
-   *   "timestampField": "timestamp.custom"
-   * }
-   */
-  @Multiline
-  private String profileUsingCustomTimestampField;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "test",
-   *        "foreach": "ip_src_addr",
-   *        "init": {},
-   *        "update": {},
-   *        "result": "2"
-   *      }
-   *   ],
-   *   "timestampField": "timestamp.missing"
-   * }
-   */
-  @Multiline
-  private String profileUsingMissingTimestampField;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "test",
-   *        "foreach": "ip_src_addr",
-   *        "init": {},
-   *        "update": {},
-   *        "result": "2"
-   *      }
-   *   ],
-   *   "timestampField": "timestamp.string"
-   * }
-   */
-  @Multiline
-  private String profileUsingStringTimestampField;
-
-  /**
-   * {
-   *   "profiles": [
-   *   ]
-   * }
-   */
-  @Multiline
-  private String noProfilesDefined;
-
-  /**
-   * {
-   *   "profiles": [
-   *      {
-   *        "profile": "profile1",
-   *        "foreach": "'global'",
-   *        "result": "1"
-   *      },
-   *      {
-   *        "profile": "profile2",
-   *        "foreach": "'global'",
-   *        "result": "2"
-   *      }
-   *   ]
-   * }
-   */
-  @Multiline
-  private String twoProfilesDefined;
-
-  private JSONObject message;
-  private long timestamp = 3333333;
-
-  @Before
-  public void setup() throws ParseException {
-
-    // parse the input message
-    JSONParser parser = new JSONParser();
-    message = (JSONObject) parser.parse(input);
-
-    // ensure the tuple returns the expected json message
-    when(tuple.getBinary(0)).thenReturn(input.getBytes());
-  }
-
-  /**
-   * Ensure that a tuple with the correct fields is emitted to downstream bolts
-   * when a profile is defined.
-   */
-  @Test
-  public void testEmitTupleWithOneProfile() throws Exception {
-
-    // setup the bolt and execute a tuple
-    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // the expected tuple fields
-    String expectedEntity = "10.0.0.1";
-    ProfileConfig expectedConfig = config.getProfiles().get(0);
-    Values expected = new Values(message, timestamp, expectedEntity, 
expectedConfig);
-
-    // a tuple should be emitted for the downstream profile builder
-    verify(outputCollector, times(1))
-            .emit(eq(tuple), eq(expected));
-
-    // the original tuple should be ack'd
-    verify(outputCollector, times(1))
-            .ack(eq(tuple));
-  }
-
-  /**
-   * If there are two profiles that need the same message, then two tuples 
should
-   * be emitted.  One tuple for each profile.
-   */
-  @Test
-  public void testEmitTupleWithTwoProfiles() throws Exception {
-
-    // setup the bolt and execute a tuple
-    ProfilerConfig config = toProfilerConfig(twoProfilesDefined);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // the expected tuple fields
-    final String expectedEntity = "global";
-    {
-      // a tuple should be emitted for the first profile
-      ProfileConfig profile1 = config.getProfiles().get(0);
-      Values expected = new Values(message, timestamp, expectedEntity, 
profile1);
-      verify(outputCollector, times(1))
-              .emit(eq(tuple), eq(expected));
-    }
-    {
-      // a tuple should be emitted for the second profile
-      ProfileConfig profile2 = config.getProfiles().get(1);
-      Values expected = new Values(message, timestamp, expectedEntity, 
profile2);
-      verify(outputCollector, times(1))
-              .emit(eq(tuple), eq(expected));
-    }
-
-    // the original tuple should be ack'd
-    verify(outputCollector, times(1))
-            .ack(eq(tuple));
-  }
-
-  /**
-   * No tuples should be emitted, if no profiles are defined.
-   */
-  @Test
-  public void testNoProfilesDefined() throws Exception {
-
-    // setup the bolt and execute a tuple
-    ProfilerConfig config = toProfilerConfig(noProfilesDefined);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // no tuple should be emitted
-    verify(outputCollector, times(0))
-            .emit(any(Tuple.class), any());
-
-    // the original tuple should be ack'd
-    verify(outputCollector, times(1))
-            .ack(eq(tuple));
-  }
-
-  /**
-   * What happens when a profile's 'onlyif' expression is true?  The message
-   * should be applied to the profile.
-   */
-  @Test
-  public void testOnlyIfTrue() throws Exception {
-
-    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // a tuple should be emitted for the downstream profile builder
-    verify(outputCollector, times(1))
-            .emit(eq(tuple), any(Values.class));
-
-    // the original tuple should be ack'd
-    verify(outputCollector, times(1))
-            .ack(eq(tuple));
-  }
-
-  /**
-   * All messages are applied to a profile where 'onlyif' is missing.  A 
profile with no
-   * 'onlyif' is treated the same as if 'onlyif=true'.
-   */
-  @Test
-  public void testOnlyIfMissing() throws Exception {
-
-    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // a tuple should be emitted for the downstream profile builder
-    verify(outputCollector, times(1))
-            .emit(eq(tuple), any(Values.class));
-
-    // the original tuple should be ack'd
-    verify(outputCollector, times(1))
-            .ack(eq(tuple));
-  }
-
-  /**
-   * What happens when a profile's 'onlyif' expression is false?  The message
-   * should NOT be applied to the profile.
-   */
-  @Test
-  public void testOnlyIfFalse() throws Exception {
-
-    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // a tuple should NOT be emitted for the downstream profile builder
-    verify(outputCollector, times(0))
-            .emit(any());
-
-    // the original tuple should be ack'd
-    verify(outputCollector, times(1))
-            .ack(eq(tuple));
-  }
-
-  /**
-   * The entity associated with a profile is defined with a Stellar 
expression.  That expression
-   * can refer to any field within the message.
-   *
-   * In this case the entity is defined as 'ip_src_addr' which is resolved to 
'10.0.0.1' based on
-   * the data contained within the message.
-   */
-  @Test
-  public void testResolveEntityName() throws Exception {
-
-    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // expected values
-    String expectedEntity = "10.0.0.1";
-    ProfileConfig expectedConfig = config.getProfiles().get(0);
-    Values expected = new Values(message, timestamp, expectedEntity, 
expectedConfig);
-
-    // a tuple should be emitted for the downstream profile builder
-    verify(outputCollector, times(1))
-            .emit(eq(tuple), eq(expected));
-
-    // the original tuple should be ack'd
-    verify(outputCollector, times(1))
-            .ack(eq(tuple));
-  }
-
-  /**
-   * What happens when invalid Stella code is used for 'onlyif'?  The invalid 
profile should be ignored.
-   */
-  @Test
-  public void testOnlyIfInvalid() throws Exception {
-
-    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // a tuple should NOT be emitted for the downstream profile builder
-    verify(outputCollector, times(0))
-            .emit(any(Values.class));
-  }
-
-  @Test
-  public void testWithNullMessage() throws Exception {
-
-    // ensure the tuple returns null to mimic a null message in kafka
-    when(tuple.getBinary(0)).thenReturn(null);
-
-    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
-    ProfileSplitterBolt bolt = createBolt(config);
-    bolt.execute(tuple);
-
-    // a tuple should NOT be emitted for the downstream profile builder
-    verify(outputCollector, times(0))
-            .emit(any(Values.class));
-
-  }
-
-  /**
-   * Creates a ProfilerConfig based on a string containing JSON.
-   *
-   * @param configAsJSON The config as JSON.
-   * @return The ProfilerConfig.
-   * @throws Exception
-   */
-  private ProfilerConfig toProfilerConfig(String configAsJSON) throws 
Exception {
-    InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8"));
-    return JSONUtils.INSTANCE.load(in, ProfilerConfig.class);
-  }
-
-  /**
-   * Create a ProfileSplitterBolt to test
-   */
-  private ProfileSplitterBolt createBolt(ProfilerConfig config) throws 
Exception {
-
-    ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
-    bolt.setCuratorFramework(client);
-    bolt.setZKCache(cache);
-    bolt.getConfigurations().updateProfilerConfig(config);
-    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-
-    // set the clock factory AFTER calling prepare to use the fixed clock 
factory
-    DefaultMessageRouter router = new 
DefaultMessageRouter(bolt.getStellarContext());
-    router.setClockFactory(new FixedClockFactory(timestamp));
-    bolt.setRouter(router);
-
-    return bolt;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
deleted file mode 100644
index b59d0b5..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.profiler.integration;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.metron.integration.InMemoryComponent;
-import org.apache.metron.integration.UnableToStartException;
-import org.apache.metron.integration.components.ZKServerComponent;
-
-import java.util.Properties;
-
-import static 
org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
-import static 
org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromFile;
-import static 
org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
-import static 
org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromFile;
-import static 
org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
-
-
-/**
- * Uploads configuration to Zookeeper.
- */
-public class ConfigUploadComponent implements InMemoryComponent {
-
-  private Properties topologyProperties;
-  private String globalConfiguration;
-  private String profilerConfiguration;
-
-  @Override
-  public void start() throws UnableToStartException {
-    try {
-      upload();
-    } catch (Exception e) {
-      throw new UnableToStartException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void stop() {
-    // nothing to do
-  }
-
-  public void update()
-      throws UnableToStartException {
-    try {
-      upload();
-    } catch (Exception e) {
-      throw new UnableToStartException(e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Uploads configuration to Zookeeper.
-   * @throws Exception
-   */
-  private void upload() throws Exception {
-    final String zookeeperUrl = 
topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
-    try(CuratorFramework client = getClient(zookeeperUrl)) {
-      if(client.getState() != CuratorFrameworkState.STARTED) {
-        client.start();
-      }
-      uploadGlobalConfig(client);
-      uploadProfilerConfig(client);
-    }
-  }
-
-  /**
-   * Upload the profiler configuration to Zookeeper.
-   * @param client The zookeeper client.
-   */
-  private void uploadProfilerConfig(CuratorFramework client) throws Exception {
-    if (profilerConfiguration != null) {
-      byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration);
-      if (globalConfig.length > 0) {
-        
writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration),
 client);
-      }
-    }
-  }
-
-  /**
-   * Upload the global configuration to Zookeeper.
-   * @param client The zookeeper client.
-   */
-  private void uploadGlobalConfig(CuratorFramework client) throws Exception {
-    if (globalConfiguration != null) {
-      byte[] globalConfig = readGlobalConfigFromFile(globalConfiguration);
-      if (globalConfig.length > 0) {
-        
writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfiguration), 
client);
-      }
-    }
-  }
-
-  public ConfigUploadComponent withTopologyProperties(Properties 
topologyProperties) {
-    this.topologyProperties = topologyProperties;
-    return this;
-  }
-
-  public ConfigUploadComponent withGlobalConfiguration(String path) {
-    this.globalConfiguration = path;
-    return this;
-  }
-
-  public ConfigUploadComponent withProfilerConfiguration(String path) {
-    this.profilerConfiguration = path;
-    return this;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
deleted file mode 100644
index 7e1628b..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.integration;
-
-import org.json.simple.JSONObject;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Enables simple creation of telemetry messages for testing.
- */
-public class MessageBuilder {
-
-  private Map<Object, Object> fields;
-
-  /**
-   * Create a new {@link MessageBuilder}.
-   */
-  public MessageBuilder() {
-    this.fields = new HashMap<>();
-  }
-
-  /**
-   * Adds all of the fields from a message to this message.
-   *
-   * @param prototype The other message that is treated as a prototype.
-   * @return A {@link MessageBuilder}
-   */
-  public MessageBuilder withFields(JSONObject prototype) {
-    prototype.forEach((key, val) -> this.fields.put(key, val));
-    return this;
-  }
-
-  /**
-   * Adds a field to the message.
-   *
-   * @param key The field name.
-   * @param value The field value.
-   * @return A {@link MessageBuilder}
-   */
-  public MessageBuilder withField(String key, Object value) {
-    this.fields.put(key, value);
-    return this;
-  }
-
-  /**
-   * Build the message.
-   *
-   * <p>This should be called after defining all of the message fields.
-   *
-   * @return A {@link MessageBuilder}.
-   */
-  public JSONObject build() {
-    return new JSONObject(fields);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
deleted file mode 100644
index 268ce26..0000000
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.integration;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
-import org.apache.metron.integration.BaseIntegrationTest;
-import org.apache.metron.integration.ComponentRunner;
-import org.apache.metron.integration.UnableToStartException;
-import org.apache.metron.integration.components.FluxTopologyComponent;
-import org.apache.metron.integration.components.KafkaComponent;
-import org.apache.metron.integration.components.ZKServerComponent;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.storm.Config;
-import org.apache.storm.serialization.KryoTupleDeserializer;
-import org.apache.storm.serialization.KryoTupleSerializer;
-import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.code.tempusfugit.temporal.Duration.seconds;
-import static com.google.code.tempusfugit.temporal.Timeout.timeout;
-import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * An integration test of the Profiler topology.
- */
-public class ProfilerIntegrationTest extends BaseIntegrationTest {
-
-  private static final String TEST_RESOURCES = 
"../../metron-analytics/metron-profiler/src/test";
-  private static final String FLUX_PATH = 
"../metron-profiler/src/main/flux/profiler/remote.yaml";
-
-  public static final long startAt = 10;
-  public static final String entity = "10.0.0.1";
-
-  private static final String tableName = "profiler";
-  private static final String columnFamily = "P";
-  private static final String inputTopic = Constants.INDEXING_TOPIC;
-  private static final String outputTopic = "profiles";
-  private static final int saltDivisor = 10;
-
-  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
-  private static final long windowDurationMillis = 
TimeUnit.SECONDS.toMillis(5);
-  private static final long periodDurationMillis = 
TimeUnit.SECONDS.toMillis(10);
-  private static final long profileTimeToLiveMillis = 
TimeUnit.SECONDS.toMillis(15);
-  private static final long maxRoutesPerBolt = 100000;
-
-  private static ColumnBuilder columnBuilder;
-  private static ZKServerComponent zkComponent;
-  private static FluxTopologyComponent fluxComponent;
-  private static KafkaComponent kafkaComponent;
-  private static ConfigUploadComponent configUploadComponent;
-  private static ComponentRunner runner;
-  private static MockHTable profilerTable;
-
-  private static String message1;
-  private static String message2;
-  private static String message3;
-
-  /**
-   * [
-   *    org.apache.metron.profiler.ProfileMeasurement,
-   *    org.apache.metron.profiler.ProfilePeriod,
-   *    org.apache.metron.common.configuration.profiler.ProfileResult,
-   *    
org.apache.metron.common.configuration.profiler.ProfileResultExpressions,
-   *    
org.apache.metron.common.configuration.profiler.ProfileTriageExpressions,
-   *    org.apache.metron.common.configuration.profiler.ProfilerConfig,
-   *    org.apache.metron.common.configuration.profiler.ProfileConfig,
-   *    org.json.simple.JSONObject,
-   *    java.util.LinkedHashMap,
-   *    org.apache.metron.statistics.OnlineStatisticsProvider
-   *  ]
-   */
-  @Multiline
-  private static String kryoSerializers;
-
-  /**
-   * The Profiler can generate profiles based on processing time.  With 
processing time,
-   * the Profiler builds profiles based on when the telemetry was processed.
-   *
-   * <p>Not defining a 'timestampField' within the Profiler configuration 
tells the Profiler
-   * to use processing time.
-   */
-  @Test
-  public void testProcessingTime() throws Exception {
-
-    // upload the config to zookeeper
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-
-    // the messages that will be applied to the profile
-    kafkaComponent.writeMessages(inputTopic, message1);
-    kafkaComponent.writeMessages(inputTopic, message2);
-    kafkaComponent.writeMessages(inputTopic, message3);
-
-    // storm needs at least one message to close its event window
-    int attempt = 0;
-    while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) {
-
-      // sleep, at least beyond the current window
-      Thread.sleep(windowDurationMillis + windowLagMillis);
-
-      // send another message to help close the current event window
-      kafkaComponent.writeMessages(inputTopic, message2);
-    }
-
-    // validate what was flushed
-    List<Integer> actuals = read(
-            profilerTable.getPutLog(),
-            columnFamily,
-            columnBuilder.getColumnQualifier("value"),
-            Integer.class);
-    assertEquals(1, actuals.size());
-    assertTrue(actuals.get(0) >= 3);
-  }
-
-  /**
-   * The Profiler can generate profiles using event time.  With event time 
processing,
-   * the Profiler uses timestamps contained in the source telemetry.
-   *
-   * <p>Defining a 'timestampField' within the Profiler configuration tells 
the Profiler
-   * from which field the timestamp should be extracted.
-   */
-  @Test
-  public void testEventTime() throws Exception {
-
-    // upload the profiler config to zookeeper
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1);
-    kafkaComponent.writeMessages(inputTopic, message2);
-    kafkaComponent.writeMessages(inputTopic, message3);
-
-    // wait until the profile is flushed
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
-
-    List<Put> puts = profilerTable.getPutLog();
-    assertEquals(1, puts.size());
-
-    // inspect the row key to ensure the profiler used event time correctly.  
the timestamp
-    // embedded in the row key should match those in the source telemetry
-    byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, 
startAt);
-    byte[] actualRowKey = puts.get(0).getRow();
-    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
-  }
-
-  /**
-   * The result produced by a Profile has to be serializable within Storm. If 
the result is not
-   * serializable the topology will crash and burn.
-   *
-   * This test ensures that if a profile returns a STATS object created using 
the STATS_INIT and
-   * STATS_ADD functions, that it can be correctly serialized and persisted.
-   */
-  @Test
-  public void testProfileWithStatsObject() throws Exception {
-
-    // upload the profiler config to zookeeper
-    uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats");
-
-    // start the topology and write test messages to kafka
-    fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, message1);
-    kafkaComponent.writeMessages(inputTopic, message2);
-    kafkaComponent.writeMessages(inputTopic, message3);
-
-    // wait until the profile is flushed
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, 
timeout(seconds(90)));
-
-    // ensure that a value was persisted in HBase
-    List<Put> puts = profilerTable.getPutLog();
-    assertEquals(1, puts.size());
-
-    // generate the expected row key. only the profile name, entity, and 
period are used to generate the row key
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile-with-stats")
-            .withEntity("global")
-            .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS);
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
-    byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement);
-
-    // ensure the correct row key was generated
-    byte[] actualRowKey = puts.get(0).getRow();
-    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), 
expectedRowKey, actualRowKey);
-  }
-
-  /**
-   * Generates an error message for if the byte comparison fails.
-   *
-   * @param expected The expected value.
-   * @param actual The actual value.
-   * @return
-   * @throws UnsupportedEncodingException
-   */
-  private String failMessage(byte[] expected, byte[] actual) throws 
UnsupportedEncodingException {
-    return String.format("expected '%s', got '%s'",
-              new String(expected, "UTF-8"),
-              new String(actual, "UTF-8"));
-  }
-
-  /**
-   * Generates the expected row key.
-   *
-   * @param profileName The name of the profile.
-   * @param entity The entity.
-   * @param whenMillis A timestamp in epoch milliseconds.
-   * @return A row key.
-   */
-  private byte[] generateExpectedRowKey(String profileName, String entity, 
long whenMillis) {
-
-    // only the profile name, entity, and period are used to generate the row 
key
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName(profileName)
-            .withEntity(entity)
-            .withPeriod(whenMillis, periodDurationMillis, 
TimeUnit.MILLISECONDS);
-
-    // build the row key
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, 
periodDurationMillis, TimeUnit.MILLISECONDS);
-    return rowKeyBuilder.rowKey(measurement);
-  }
-
-  /**
-   * Reads a value written by the Profiler.
-   *
-   * @param family The column family.
-   * @param qualifier The column qualifier.
-   * @param clazz The expected type of the value.
-   * @param <T> The expected type of the value.
-   * @return The value written by the Profiler.
-   */
-  private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, 
Class<T> clazz) {
-    List<T> results = new ArrayList<>();
-
-    for(Put put: puts) {
-      List<Cell> cells = put.get(Bytes.toBytes(family), qualifier);
-      for(Cell cell : cells) {
-        T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
-        results.add(value);
-      }
-    }
-
-    return results;
-  }
-
-  @BeforeClass
-  public static void setupBeforeClass() throws UnableToStartException {
-
-    // create some messages that contain a timestamp - a really old timestamp; 
close to 1970
-    message1 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt)
-            .build()
-            .toJSONString();
-
-    message2 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt + 100)
-            .build()
-            .toJSONString();
-
-    message3 = new MessageBuilder()
-            .withField("ip_src_addr", entity)
-            .withField("timestamp", startAt + (windowDurationMillis * 2))
-            .build()
-            .toJSONString();
-
-    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-
-    // storm topology properties
-    final Properties topologyProperties = new Properties() {{
-
-      // storm settings
-      setProperty("profiler.workers", "1");
-      setProperty("profiler.executors", "0");
-      setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]");
-      setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60");
-      setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000");
-
-      // ensure tuples are serialized during the test, otherwise serialization 
problems
-      // will not be found until the topology is run on a cluster with 
multiple workers
-      setProperty(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, "true");
-      setProperty(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, "false");
-      setProperty(Config.TOPOLOGY_KRYO_REGISTER, kryoSerializers);
-
-      // kafka settings
-      setProperty("profiler.input.topic", inputTopic);
-      setProperty("profiler.output.topic", outputTopic);
-      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
-      setProperty("kafka.security.protocol", "PLAINTEXT");
-
-      // hbase settings
-      setProperty("profiler.hbase.salt.divisor", 
Integer.toString(saltDivisor));
-      setProperty("profiler.hbase.table", tableName);
-      setProperty("profiler.hbase.column.family", columnFamily);
-      setProperty("profiler.hbase.batch", "10");
-      setProperty("profiler.hbase.flush.interval.seconds", "1");
-      setProperty("hbase.provider.impl", "" + 
MockHBaseTableProvider.class.getName());
-
-      // profile settings
-      setProperty("profiler.period.duration", 
Long.toString(periodDurationMillis));
-      setProperty("profiler.period.duration.units", "MILLISECONDS");
-      setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis));
-      setProperty("profiler.ttl.units", "MILLISECONDS");
-      setProperty("profiler.window.duration", 
Long.toString(windowDurationMillis));
-      setProperty("profiler.window.duration.units", "MILLISECONDS");
-      setProperty("profiler.window.lag", Long.toString(windowLagMillis));
-      setProperty("profiler.window.lag.units", "MILLISECONDS");
-      setProperty("profiler.max.routes.per.bolt", 
Long.toString(maxRoutesPerBolt));
-    }};
-
-    // create the mock table
-    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
-
-    zkComponent = getZKServerComponent(topologyProperties);
-
-    // create the input and output topics
-    kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
-            new KafkaComponent.Topic(inputTopic, 1),
-            new KafkaComponent.Topic(outputTopic, 1)));
-
-    // upload profiler configuration to zookeeper
-    configUploadComponent = new ConfigUploadComponent()
-            .withTopologyProperties(topologyProperties);
-
-    // load flux definition for the profiler topology
-    fluxComponent = new FluxTopologyComponent.Builder()
-            .withTopologyLocation(new File(FLUX_PATH))
-            .withTopologyName("profiler")
-            .withTopologyProperties(topologyProperties)
-            .build();
-
-    // start all components
-    runner = new ComponentRunner.Builder()
-            .withComponent("zk",zkComponent)
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("config", configUploadComponent)
-            .withComponent("storm", fluxComponent)
-            .withMillisecondsBetweenAttempts(15000)
-            .withNumRetries(10)
-            .withCustomShutdownOrder(new String[] 
{"storm","config","kafka","zk"})
-            .build();
-    runner.start();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    MockHBaseTableProvider.clear();
-    if (runner != null) {
-      runner.stop();
-    }
-  }
-
-  @Before
-  public void setup() {
-    // create the mock table
-    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    MockHBaseTableProvider.clear();
-    profilerTable.clear();
-    if (runner != null) {
-      runner.reset();
-    }
-  }
-
-  /**
-   * Uploads config values to Zookeeper.
-   * @param path The path on the local filesystem to the config values.
-   * @throws Exception
-   */
-  public void uploadConfig(String path) throws Exception {
-    configUploadComponent
-            .withGlobalConfiguration(path)
-            .withProfilerConfiguration(path)
-            .update();
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/resources/log4j.properties 
b/metron-analytics/metron-profiler/src/test/resources/log4j.properties
deleted file mode 100644
index 541f368..0000000
--- a/metron-analytics/metron-profiler/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,34 +0,0 @@
-#
-#
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-#
-
-# Root logger option
-log4j.rootLogger=ERROR, stdout
-
-# Direct log messages to stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p 
%c{1}:%L - %m%n
-log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter
-log4j.appender.stdout.filter.1.StringToMatch=Connection timed out
-log4j.appender.stdout.filter.1.AcceptOnMatch=false
-log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter
-log4j.appender.stdout.filter.2.StringToMatch=Background
-log4j.appender.stdout.filter.2.AcceptOnMatch=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/pom.xml b/metron-analytics/pom.xml
index 37ee2b0..bcfc6e0 100644
--- a/metron-analytics/pom.xml
+++ b/metron-analytics/pom.xml
@@ -43,7 +43,7 @@
                <module>metron-maas-service</module>
                <module>metron-maas-common</module>
                <module>metron-statistics</module>
-               <module>metron-profiler</module>
+               <module>metron-profiler-storm</module>
                <module>metron-profiler-client</module>
                <module>metron-profiler-common</module>
                <module>metron-profiler-spark</module>

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-deployment/packaging/docker/deb-docker/pom.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/deb-docker/pom.xml 
b/metron-deployment/packaging/docker/deb-docker/pom.xml
index a0df09a..92d63cc 100644
--- a/metron-deployment/packaging/docker/deb-docker/pom.xml
+++ b/metron-deployment/packaging/docker/deb-docker/pom.xml
@@ -126,7 +126,7 @@
                                     </includes>
                                 </resource>
                                 <resource>
-                                    
<directory>${metron_dir}/metron-analytics/metron-profiler/target/</directory>
+                                    
<directory>${metron_dir}/metron-analytics/metron-profiler-storm/target/</directory>
                                     <includes>
                                         <include>*.tar.gz</include>
                                     </includes>

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-deployment/packaging/docker/rpm-docker/pom.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/pom.xml 
b/metron-deployment/packaging/docker/rpm-docker/pom.xml
index 0d9c4d1..2220ca8 100644
--- a/metron-deployment/packaging/docker/rpm-docker/pom.xml
+++ b/metron-deployment/packaging/docker/rpm-docker/pom.xml
@@ -162,7 +162,7 @@
                                     </includes>
                                 </resource>
                                 <resource>
-                                    
<directory>${metron_dir}/metron-analytics/metron-profiler/target/</directory>
+                                    
<directory>${metron_dir}/metron-analytics/metron-profiler-storm/target/</directory>
                                     <includes>
                                         <include>*.tar.gz</include>
                                     </includes>

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
index 5240d7a..a7dc248 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
@@ -119,7 +119,7 @@ public class ZKConfigurationsCacheIntegrationTest {
   @Multiline
   public static String globalConfig;
 
-  public static File profilerDir = new 
File("../../metron-analytics/metron-profiler/src/test/config/zookeeper");
+  public static File profilerDir = new 
File("../../metron-analytics/metron-profiler-storm/src/test/config/zookeeper");
   public ConfigurationsCache cache;
 
   public ZKServerComponent zkComponent;

Reply via email to