METRON-1717 Relocate Storm Profiler Code (nickwallen) closes apache/metron#1187


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3d84ea42
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3d84ea42
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3d84ea42

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 3d84ea4290b9439b97b549a8e8158b6fe4bfa8f2
Parents: 9455c4e
Author: nickwallen <[email protected]>
Authored: Mon Sep 10 16:35:13 2018 -0400
Committer: nickallen <[email protected]>
Committed: Mon Sep 10 16:35:13 2018 -0400

----------------------------------------------------------------------
 README.md                                       |   2 +-
 .../metron-profiler-common/README.md            | 386 ++++++++
 metron-analytics/metron-profiler-repl/README.md |   7 +-
 .../metron-profiler-spark/README.md             |  16 +-
 .../metron-profiler-storm/.gitignore            |   1 +
 .../metron-profiler-storm/README.md             | 400 +++++++++
 metron-analytics/metron-profiler-storm/pom.xml  | 407 +++++++++
 .../src/main/assembly/assembly.xml              |  72 ++
 .../src/main/config/profiler.properties         |  71 ++
 .../src/main/flux/profiler/remote.yaml          | 218 +++++
 .../storm/FixedFrequencyFlushSignal.java        | 135 +++
 .../metron/profiler/storm/FlushSignal.java      |  51 ++
 .../metron/profiler/storm/HBaseEmitter.java     |  73 ++
 .../metron/profiler/storm/KafkaEmitter.java     | 164 ++++
 .../profiler/storm/ManualFlushSignal.java       |  54 ++
 .../profiler/storm/ProfileBuilderBolt.java      | 509 +++++++++++
 .../profiler/storm/ProfileHBaseMapper.java      | 117 +++
 .../storm/ProfileMeasurementEmitter.java        |  59 ++
 .../profiler/storm/ProfileSplitterBolt.java     | 228 +++++
 .../src/main/resources/META-INF/LICENSE         | 604 +++++++++++++
 .../src/main/resources/META-INF/NOTICE          |  92 ++
 .../src/main/scripts/start_profiler_topology.sh |  22 +
 .../zookeeper/event-time-test/profiler.json     |  12 +
 .../processing-time-test/profiler.json          |  11 +
 .../zookeeper/profile-with-stats/profiler.json  |  12 +
 .../storm/FixedFrequencyFlushSignalTest.java    |  71 ++
 .../metron/profiler/storm/HBaseEmitterTest.java | 118 +++
 .../metron/profiler/storm/KafkaEmitterTest.java | 291 ++++++
 .../profiler/storm/ProfileBuilderBoltTest.java  | 356 ++++++++
 .../profiler/storm/ProfileHBaseMapperTest.java  |  93 ++
 .../profiler/storm/ProfileSplitterBoltTest.java | 455 ++++++++++
 .../integration/ConfigUploadComponent.java      | 124 +++
 .../storm/integration/MessageBuilder.java       |  75 ++
 .../integration/ProfilerIntegrationTest.java    | 421 +++++++++
 .../src/test/resources/log4j.properties         |  34 +
 metron-analytics/metron-profiler/.gitignore     |   1 -
 metron-analytics/metron-profiler/README.md      | 898 -------------------
 metron-analytics/metron-profiler/pom.xml        | 407 ---------
 .../src/main/assembly/assembly.xml              |  72 --
 .../src/main/config/profiler.properties         |  71 --
 .../src/main/flux/profiler/remote.yaml          | 218 -----
 .../bolt/FixedFrequencyFlushSignal.java         | 135 ---
 .../metron/profiler/bolt/FlushSignal.java       |  51 --
 .../metron/profiler/bolt/HBaseEmitter.java      |  73 --
 .../metron/profiler/bolt/KafkaEmitter.java      | 164 ----
 .../metron/profiler/bolt/ManualFlushSignal.java |  54 --
 .../profiler/bolt/ProfileBuilderBolt.java       | 509 -----------
 .../profiler/bolt/ProfileHBaseMapper.java       | 117 ---
 .../bolt/ProfileMeasurementEmitter.java         |  59 --
 .../profiler/bolt/ProfileSplitterBolt.java      | 232 -----
 .../src/main/resources/META-INF/LICENSE         | 604 -------------
 .../src/main/resources/META-INF/NOTICE          |  92 --
 .../src/main/scripts/start_profiler_topology.sh |  22 -
 .../zookeeper/event-time-test/profiler.json     |  12 -
 .../processing-time-test/profiler.json          |  11 -
 .../zookeeper/profile-with-stats/profiler.json  |  12 -
 .../bolt/FixedFrequencyFlushSignalTest.java     |  71 --
 .../metron/profiler/bolt/HBaseEmitterTest.java  | 120 ---
 .../metron/profiler/bolt/KafkaEmitterTest.java  | 291 ------
 .../profiler/bolt/ProfileBuilderBoltTest.java   | 356 --------
 .../profiler/bolt/ProfileHBaseMapperTest.java   |  93 --
 .../profiler/bolt/ProfileSplitterBoltTest.java  | 455 ----------
 .../integration/ConfigUploadComponent.java      | 124 ---
 .../profiler/integration/MessageBuilder.java    |  75 --
 .../integration/ProfilerIntegrationTest.java    | 437 ---------
 .../src/test/resources/log4j.properties         |  34 -
 metron-analytics/pom.xml                        |   2 +-
 .../packaging/docker/deb-docker/pom.xml         |   2 +-
 .../packaging/docker/rpm-docker/pom.xml         |   2 +-
 .../ZKConfigurationsCacheIntegrationTest.java   |   2 +-
 70 files changed, 5751 insertions(+), 5888 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 4858807..95b6faf 100644
--- a/README.md
+++ b/README.md
@@ -140,7 +140,7 @@ Some useful utilities that cross all of these parts of the 
architecture:
 * [Stellar](metron-platform/metron-common) : A custom data transformation 
language that is used throughout metron from simple field transformation to 
expressing triage rules.
 * [Model as a Service](metron-analytics/metron-maas-service) : A Yarn 
application which can deploy machine learning and statistical models onto the 
cluster along with the associated Stellar functions to be able to call out to 
them in a scalable manner.
 * [Data management](metron-platform/metron-data-management) : A set of data 
management utilities aimed at getting data into HBase in a format which will 
allow data flowing through metron to be enriched with the results.  Contains 
integrations with threat intelligence feeds exposed via TAXII as well as simple 
flat file structures.
-* [Profiler](metron-analytics/metron-profiler) : A feature extraction 
mechanism that can generate a profile describing the behavior of an entity. An 
entity might be a server, user, subnet or application. Once a profile has been 
generated defining what normal behavior looks-like, models can be built that 
identify anomalous behavior.
+* [Profiler](metron-analytics/metron-profiler-common) : A feature extraction 
mechanism that can generate a profile describing the behavior of an entity. An 
entity might be a server, user, subnet or application. Once a profile has been 
generated defining what normal behavior looks-like, models can be built that 
identify anomalous behavior.
 
 # Notes on Adding a New Sensor
 In order to allow for meta alerts to be queries alongside regular alerts in 
Elasticsearch 2.x,

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-common/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/README.md 
b/metron-analytics/metron-profiler-common/README.md
new file mode 100644
index 0000000..8f26aaf
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/README.md
@@ -0,0 +1,386 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Metron Profiler
+
+* [Introduction](#introduction)
+* [Getting Started](#getting-started)
+* [Profiles](#profiles)
+* [Examples](#examples)
+
+## Introduction
+
+The Profiler is a feature extraction mechanism that can generate a profile 
describing the behavior of an entity.  An entity might be a server, user, 
subnet or application. Once a profile has been generated defining what normal 
behavior looks-like, models can be built that identify anomalous behavior.
+
+This is achieved by summarizing the telemetry data consumed by Metron over 
tumbling windows. A summary statistic is applied to the data received within a 
given window.  Collecting these values across many windows result in a time 
series that is useful for analysis.
+
+Any field contained within a message can be used to generate a profile.  A 
profile can even be produced by combining fields that originate in different 
data sources.  A user has considerable power to transform the data used in a 
profile by leveraging the Stellar language. 
+
+There are three separate ports of the Profiler that share this common code 
base.
+* The [Storm Profiler](../metron-profiler-storm/README.md) builds low-latency 
profiles over streaming data sets.
+* The [Spark Profiler](../metron-profiler-spark/README.md) backfills profiles 
using archived telemetry.
+* The [REPL Profiler](../metron-profiler-repl/README.md) allows profiles to be 
tested and debugged within the Stellar REPL.
+
+## Getting Started
+
+1. [Create a profile](../metron-profiler-repl/README.md#getting-started) using 
the Stellar REPL. Validate your profile using mock data, then apply real, live 
data.
+
+1. [Backfill your profile](../metron-profiler-spark/README.md#getting-started) 
using archived telemetry to see how your profile behaves over time.
+
+1. [Deploy your profile](../metron-profiler-storm/README.md#getting-started) 
to Storm to maintain a low-latency profile over a streaming data set.
+
+1. [Retrieve your profile data](../metron-profiler-client/README.md) using the 
Stellar API so that you can build enrichments, alert on abnormalities.
+
+1. Explore more ways to create [profiles](#more-examples).
+
+## Profiles
+
+Let's start with a simple example. The following profile maintains a count of 
the number of telemetry messages for each IP source address.  A counter is 
initialized to 0, then incremented each time a message is received for a given 
IP source address.  At regular intervals the count is flushed and stored. Over 
time this results in a time series describing the amount of telemetry received 
for each IP source address.
+
+```
+{
+  "profiles": [
+    {
+      "profile": "hello-world",
+      "foreach": "ip_src_addr",
+      "init": {
+        "count": 0
+      },
+      "update": {
+        "count": "count + 1"
+      },
+      "result": "count",
+    }
+  ]
+}
+```
+
+A profile definition contains two fields; only one of which is required.
+
+```
+{
+    "profiles": [
+        { "profile": "one", ... },
+        { "profile": "two", ... }
+    ],
+    "timestampField": "timestamp"
+}
+```
+
+| Name                              |               | Description
+|---                                |---            |---
+| [profiles](#profiles)             | Required      | A list of zero or more 
Profile definitions.
+| [timestampField](#timestampfield) | Optional      | Indicates whether 
processing time or event time should be used. By default, processing time is 
enabled.
+
+
+#### `profiles`
+
+*Required*
+
+A list of zero or more Profile definitions.
+
+#### `timestampField`
+
+*Optional*
+
+Indicates whether processing time or event time is used. By default, 
processing time is enabled.
+
+##### Processing Time
+
+By default, no `timestampField` is defined.  In this case, the Profiler uses 
system time when generating profiles.  This means that the profiles are 
generated based on when the data has been processed by the Profiler.  This is 
also known as 'processing time'.
+
+This is the simplest mode of operation, but has some draw backs.  If the 
Profiler is consuming live data and all is well, the processing and event times 
will likely remain similar and consistent. If processing time diverges from 
event time, then the Profiler will generate skewed profiles.
+
+There are a few scenarios that might cause skewed profiles when using 
processing time.  For example when a system has undergone a scheduled 
maintenance window and is restarted, a high volume of messages will need to be 
processed by the Profiler. The output of the Profiler might indicate an 
increase in activity during this time, although no change in activity actually 
occurred on the target network. The same situation could occur if an upstream 
system which provides telemetry undergoes an outage.  
+
+[Event Time](#event-time) can be used to mitigate these problems.
+
+##### Event Time
+
+Alternatively, a `timestampField` can be defined.  This must be the name of a 
field contained within the telemetry processed by the Profiler.  The Profiler 
will extract and use the timestamp contained within this field.
+
+* If a message does not contain this field, it will be dropped.
+
+* The field must contain a timestamp in epoch milliseconds expressed as either 
a numeric or string. Otherwise, the message will be dropped.
+
+* The Profiler will use the same field across all telemetry sources and for 
all profiles.
+
+* Be aware of clock skew across telemetry sources.  If your profile is 
processing telemetry from multiple sources where the clock differs 
significantly, the Profiler may assume that some of those messages are late and 
will be ignored.  Adjusting the 
[`profiler.window.duration`](#profilerwindowduration) and 
[`profiler.window.lag`](#profilerwindowlag) can help accommodate skewed clocks.
+
+### Profiles
+
+A profile definition requires a JSON-formatted set of elements, many of which 
can contain Stellar code.  The specification contains the following elements.  
(For the impatient, skip ahead to the [Examples](#examples).)
+
+| Name                          |               | Description
+|---                            |---            |---
+| [profile](#profile)           | Required      | Unique name identifying the 
profile.
+| [foreach](#foreach)           | Required      | A separate profile is 
maintained "for each" of these.
+| [onlyif](#onlyif)             | Optional      | Boolean expression that 
determines if a message should be applied to the profile.
+| [groupBy](#groupby)           | Optional      | One or more Stellar 
expressions used to group the profile measurements when persisted.
+| [init](#init)                 | Optional      | One or more expressions 
executed at the start of a window period.
+| [update](#update)             | Required      | One or more expressions 
executed when a message is applied to the profile.
+| [result](#result)             | Required      | Stellar expressions that are 
executed when the window period expires.
+| [expires](#expires)           | Optional      | Profile data is purged after 
this period of time, specified in days.
+
+#### `profile`
+
+*Required*
+
+A unique name identifying the profile.  The field is treated as a string.
+
+#### `foreach`
+
+*Required*
+
+A separate profile is maintained 'for each' of these.  This is effectively the 
entity that the profile is describing.  The field is expected to contain a 
Stellar expression whose result is the entity name.  
+
+For example, if `ip_src_addr` then a separate profile would be maintained for 
each unique IP source address in the data; 10.0.0.1, 10.0.0.2, etc.
+
+#### `onlyif`
+
+*Optional*
+
+An expression that determines if a message should be applied to the profile.  
A Stellar expression that returns a Boolean is expected.  A message is only 
applied to a profile if this expression is true. This allows a profile to 
filter the messages that get applied to it.
+
+#### `groupBy`
+
+*Optional*
+
+One or more Stellar expressions used to group the profile measurements when 
persisted. This can be used to sort the Profile data to allow for a contiguous 
scan when accessing subsets of the data.  This is also one way to deal with 
calendar effects.  For example, where activity on a weekday can be very 
different from a weekend.
+
+A common use case would be grouping by day of week.  This allows a contiguous 
scan to access all profile data for Mondays only.  Using the following 
definition would achieve this.
+
+```
+"groupBy": [ "DAY_OF_WEEK(start)" ]
+```
+
+The expression can reference any of these variables.
+* Any variable defined by the profile in its `init` or `update` expressions.
+* `profile` The name of the profile.
+* `entity` The name of the entity being profiled.
+* `start` The start time of the profile period in epoch milliseconds.
+* `end` The end time of the profile period in epoch milliseconds.
+* `duration` The duration of the profile period in milliseconds.
+* `result` The result of executing the `result` expression.
+
+#### `init`
+
+*Optional*
+
+One or more expressions executed at the start of a window period.  A map is 
expected where the key is the variable name and the value is a Stellar 
expression.  The map can contain zero or more variable:expression pairs. At the 
start of each window period, each expression is executed once and stored in the 
given variable. Note that constant init values such as "0" must be in quotes 
regardless of their type, as the init value must be a string to be executed by 
Stellar.
+
+```
+"init": {
+  "var1": "0",
+  "var2": "1"
+}
+```
+
+#### `update`
+
+*Required*
+
+One or more expressions executed when a message is applied to the profile.  A 
map is expected where the key is the variable name and the value is a Stellar 
expression.  The map can include 0 or more variables/expressions. When each 
message is applied to the profile, the expression is executed and stored in a 
variable with the given name.
+
+```
+"update": {
+  "var1": "var1 + 1",
+  "var2": "var2 + 1"
+}
+```
+
+#### `result`
+
+*Required*
+
+Stellar expressions that are executed when the window period expires.  The 
expressions are expected to summarize the messages that were applied to the 
profile over the window period.  In the most basic form a single result is 
persisted for later retrieval.
+```
+"result": "var1 + var2"
+```
+
+For more advanced use cases, a profile can generate two types of results.  A 
profile can define one or both of these result types at the same time.
+* `profile`:  A required expression that defines a value that is persisted for 
later retrieval.
+* `triage`: An optional expression that defines values that are accessible 
within the Threat Triage process.
+
+**profile**
+
+A required Stellar expression that results in a value that is persisted in the 
profile store for later retrieval.  The expression can result in any object 
that is Kryo serializable.  These values can be retrieved for later use with 
the [Profiler Client](../metron-profiler-client).
+```
+"result": {
+    "profile": "2 + 2"
+}
+```
+
+An alternative, simplified form is also acceptable.
+```
+"result": "2 + 2"
+```
+
+**triage**
+
+An optional map of one or more Stellar expressions. The value of each 
expression is made available to the Threat Triage process under the given name. 
 Each expression must result in a either a primitive type, like an integer, 
long, or short, or a String.  All other types will result in an error.
+
+In the following example, three values, the minimum, the maximum and the mean 
are appended to a message.  This message is consumed by Metron, like other 
sources of telemetry, and each of these values are accessible from within the 
Threat Triage process using the given field names; `min`, `max`, and `mean`.
+```
+"result": {
+    "triage": {
+        "min": "STATS_MIN(stats)",
+        "max": "STATS_MAX(stats)",
+        "mean": "STATS_MEAN(stats)"
+    }
+}
+```
+
+#### `expires`
+
+*Optional*
+
+A numeric value that defines how many days the profile data is retained.  
After this time, the data expires and is no longer accessible.  If no value is 
defined, the data does not expire.
+
+The REPL can be a powerful tool for developing profiles. Read all about 
[Developing Profiles](../metron-profiler-client/#developing_profiles).
+
+## Examples
+
+The following examples are intended to highlight the functionality provided by 
the Profiler. Try out these examples easily in the Stellar REPL as described in 
the [Getting Started](#getting-started) section.
+
+### Example 1
+
+This example captures the ratio of DNS traffic to HTTP traffic for each host. 
The following configuration would be used to generate this profile.
+
+```
+{
+  "profiles": [
+    {
+      "profile": "dns-to-http-by-source",
+      "foreach": "ip_src_addr",
+      "onlyif": "protocol == 'DNS' or protocol == 'HTTP'",
+      "init": {
+        "num_dns": 1.0,
+        "num_http": 1.0
+      },
+      "update": {
+        "num_dns": "num_dns + (if protocol == 'DNS' then 1 else 0)",
+        "num_http": "num_http + (if protocol == 'HTTP' then 1 else 0)"
+      },
+      "result": "num_dns / num_http"
+    }
+  ]
+}
+```
+
+This creates a profile...
+ * Named ‘dns-to-http-by-source’
+ * That for each IP source address
+ * Only if the 'protocol' field equals 'HTTP' or 'DNS'
+ * Accumulates the number of DNS requests
+ * Accumulates the number of HTTP requests
+ * Returns the ratio of these as the result
+
+### Example 2
+
+This example captures the average of the `length` field for HTTP traffic. The 
following profile could be used.
+
+```
+{
+  "profiles": [
+    {
+      "profile": "avg-http-length",
+      "foreach": "ip_src_addr",
+      "onlyif": "protocol == 'HTTP'",
+      "update": { "s": "STATS_ADD(s, length)" },
+      "result": "STATS_MEAN(s)"
+    }
+  ]
+}
+```
+
+This creates a profile...
+ * Named ‘avg-http-length’
+ * That for each IP source address
+ * Only if the 'protocol' field is 'HTTP'
+ * Captures the `length` field
+ * Calculates the average as the result
+
+It is important to note that the Profiler can persist any serializable Object, 
not just numeric values. Instead of storing the actual mean, the profile could 
store a statistical sketch of the lengths.  This summary can then be used at a 
later time to calculate the mean, min, max, percentiles, or any other sensible 
metric.  This provides a much greater degree of flexibility. The following 
Stellar REPL session shows how you might do this.
+
+1. Retrieve the last 30 minutes of profile measurements for a specific host.
+    ```
+    $ source /etc/default/metron
+    $ bin/stellar -z $ZOOKEEPER
+    
+    [Stellar]>>> stats := PROFILE_GET( "example4", "10.0.0.1", 
PROFILE_FIXED(30, "MINUTES"))
+    [org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9, 
...]
+    ```
+
+1. Calculate different summary metrics using the same profile data.
+    ```
+    [Stellar]>>> aStat := GET_FIRST(stats)
+    org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9
+    
+    [Stellar]>>> STATS_MEAN(aStat)
+    15979.0625
+    
+    [Stellar]>>> STATS_PERCENTILE(aStat, 90)
+    30310.958
+    ```
+
+1. Merge all of the profile measurements over the past 30 minutes into a 
single sketch and calculate the 90th percentile.
+    ```
+    [Stellar]>>> merged := STATS_MERGE( stats)
+    
+    [Stellar]>>> STATS_PERCENTILE(merged, 90)
+    29810.992
+    ```
+
+
+More information on accessing profile data can be found in the [Profiler 
Client](../metron-profiler-client/README.md).
+
+More information on using the [`STATS_*` 
functions](../metron-statistics/README.md).
+
+
+### Example 3
+
+This profile captures the vertex degree of a host. If you view network 
communication as a directed graph, the in and out degree of each host can 
distinguish behaviors. Anomalies can serve as an indicator of compromise.  For 
example, you might find clients normally have an out-degree >> in-degree, 
whereas a server might be the opposite.
+
+```
+{
+  "profiles": [
+    {
+      "profile": "in-degrees",
+      "onlyif": "source.type == 'yaf'",
+      "foreach": "ip_dst_addr",
+      "update": { "in": "HLLP_ADD(in, ip_src_addr)" },
+      "result": "HLLP_CARDINALITY(in)"
+    },
+    {
+      "profile": "out-degrees",
+      "onlyif": "source.type == 'yaf'",
+      "foreach": "ip_src_addr",
+      "update": { "out": "HLLP_ADD(out, ip_dst_addr)" },
+      "result": "HLLP_CARDINALITY(out)"
+    }
+  ]
+}
+```
+
+This creates a profile...
+ * Named ‘in-degrees’
+ * That for each IP destination address
+ * Captures the IP source address
+ * Then calculates the cardinality; the number of unique IPs this host has 
interacted with
+
+The second profile calculates the out-degree.

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-repl/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-repl/README.md 
b/metron-analytics/metron-profiler-repl/README.md
index e0f61d4..128ed2f 100644
--- a/metron-analytics/metron-profiler-repl/README.md
+++ b/metron-analytics/metron-profiler-repl/README.md
@@ -17,7 +17,8 @@ limitations under the License.
 -->
 # Metron Profiler for the Stellar REPL
 
-This project allows profiles to be executed within the Stellar REPL. This is a 
port of the Profiler to run in the Stellar REPL.
+This project allows profiles to be executed within the Stellar REPL. This is a 
port of the Profiler to the Stellar REPL that allows profiles to be tested and 
debugged within a controlled environment.
+
 
 * [Introduction](#introduction)
 * [Getting Started](#getting-started)
@@ -27,7 +28,7 @@ This project allows profiles to be executed within the 
Stellar REPL. This is a p
 
 Creating and refining profiles is an iterative process.  Iterating against a 
live stream of data is slow, difficult and error prone.  Running the Profiler 
in the Stellar REPL provides a controlled and isolated execution environment to 
create, refine and troubleshoot profiles.
 
-For an introduction to the Profiler, see the [Profiler 
README](../metron-profiler/README.md).
+For an introduction to the Profiler, see the [Profiler 
README](../metron-profiler-common/README.md).
 
 ## Getting Started
 
@@ -132,7 +133,7 @@ This section describes how to get started using the 
Profiler in the Stellar REPL
          Profiler{1 profile(s), 10 messages(s), 10 route(s)}
        ```
 
-1. After you are satisfied with your profile, the next step is to deploy the 
profile against the live stream of telemetry being capture by Metron. This 
involves deploying the profile to either the [Storm 
Profiler](../metron-profiler/README.md) or the [Spark 
Profiler](../metron-profiler-spark/README.md).
+1. After you are satisfied with your profile, the next step is to deploy the 
profile against the live stream of telemetry being capture by Metron. This 
involves deploying the profile to either the [Storm 
Profiler](../metron-profiler-storm/README.md) or the [Spark 
Profiler](../metron-profiler-spark/README.md).
 
 
 ## Installation

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-spark/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-spark/README.md 
b/metron-analytics/metron-profiler-spark/README.md
index 0a31263..d137e51 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -17,7 +17,7 @@ limitations under the License.
 -->
 # Metron Profiler for Spark
 
-This project allows profiles to be executed using [Apache 
Spark](https://spark.apache.org). This is a port of the Profiler to Spark.
+This project allows profiles to be executed using [Apache 
Spark](https://spark.apache.org). This is a port of the Profiler to Spark that 
allows you to backfill profiles using archived telemetry.
 
 * [Introduction](#introduction)
 * [Getting Started](#getting-started)
@@ -27,9 +27,9 @@ This project allows profiles to be executed using [Apache 
Spark](https://spark.a
 
 ## Introduction
 
-Using the [Streaming Profiler](../metron-profiler/README.md) in [Apache 
Storm](http://storm.apache.org) allows you to create profiles based on the 
stream of telemetry being captured, enriched, triaged, and indexed by Metron. 
This does not allow you to create a profile based on telemetry that was 
captured in the past.  
+Using the [Streaming Profiler](../metron-profiler-storm/README.md) in [Apache 
Storm](http://storm.apache.org) allows you to create profiles based on the 
stream of telemetry being captured, enriched, triaged, and indexed by Metron. 
This does not allow you to create a profile based on telemetry that was 
captured in the past.  
 
-There are many cases where you might want to produce a profile from telemetry 
in the past.  This is referred to as "profile seeding".
+There are many cases where you might want to produce a profile from telemetry 
in the past.  This is referred to as profile seeding or backfilling.
 
 * As a Security Data Scientist, I want to understand the historical behaviors 
and trends of a profile so that I can determine if the profile has predictive 
value for model building.
 
@@ -39,12 +39,10 @@ The Batch Profiler running in [Apache 
Spark](https://spark.apache.org) allows yo
 
 The portion of a profile produced by the Batch Profiler should be 
indistinguishable from the portion created by the Streaming Profiler.  
Consumers of the profile should not care how the profile was generated.  Using 
the Streaming Profiler together with the Batch Profiler allows you to create a 
complete profile over a wide range of time.
 
-For an introduction to the Profiler and Profiler concepts, see the [Profiler 
README](../metron-profiler/README.md).
+For an introduction to the Profiler, see the [Profiler 
README](../metron-profiler-common/README.md).
 
 ## Getting Started
 
-
-
 1. Create a profile definition by editing 
`$METRON_HOME/config/zookeeper/profiler.json` as follows.  
 
     ```
@@ -95,8 +93,6 @@ The Batch Profiler package is installed automatically when 
installing Metron usi
 
 The Batch Profiler requires Spark version 2.3.0+.
 
-### Packages
-
 #### Build the RPM
 
 1. Build Metron.
@@ -115,7 +111,7 @@ The Batch Profiler requires Spark version 2.3.0+.
     find ./ -name "metron-profiler-spark*.rpm"
     ```
 
-### Build the DEB
+#### Build the DEB
 
 1. Build Metron.
     ```
@@ -151,7 +147,7 @@ You can store both settings for the Profiler along with 
settings for Spark in th
 
 ### `profiler.batch.input.path`
 
-*Default*: hdfs://localhost:9000/apps/metron/indexing/indexed/*/*
+*Default*: "hdfs://localhost:9000/apps/metron/indexing/indexed/*/*"
 
 The path to the input data read by the Batch Profiler.
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/.gitignore
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/.gitignore 
b/metron-analytics/metron-profiler-storm/.gitignore
new file mode 100644
index 0000000..df1a13b
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/.gitignore
@@ -0,0 +1 @@
+/logs
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/README.md 
b/metron-analytics/metron-profiler-storm/README.md
new file mode 100644
index 0000000..c952cb7
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/README.md
@@ -0,0 +1,400 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Metron Profiler for Storm
+
+This project allows profiles to be executed using [Apache 
Storm](https://storm.apache.org). This is a port of the Profiler to Storm that 
builds low-latency profiles over streaming data sets.
+
+* [Introduction](#introduction)
+* [Getting Started](#getting-started)
+* [Installation](#installation)
+* [Configuring the Profiler](#configuring-the-profiler)
+* [Implementation](#implementation)
+
+## Introduction
+
+The Profiler is a feature extraction mechanism that can generate a profile 
describing the behavior of an entity.  An entity might be a server, user, 
subnet or application. Once a profile has been generated defining what normal 
behavior looks-like, models can be built that identify anomalous behavior.
+
+This is achieved by summarizing the streaming telemetry data consumed by 
Metron over sliding windows. A summary statistic is applied to the data 
received within a given window.  Collecting this summary across many windows 
results in a time series that is useful for analysis.
+
+Any field contained within a message can be used to generate a profile.  A 
profile can even be produced by combining fields that originate in different 
data sources.  A user has considerable power to transform the data used in a 
profile by leveraging the Stellar language. A user only need configure the 
desired profiles and ensure that the Profiler topology is running.
+
+For an introduction to the Profiler, see the [Profiler 
README](../metron-profiler-common/README.md).
+
+## Getting Started
+
+This section will describe the steps required to get your first "Hello, 
World!"" profile running.  This assumes that you have a successful Profiler 
[Installation](#installation) and have it running.  You can deploy profiles in 
two different ways.
+
+* [Deploying Profiles with the Stellar 
Shell](#deploying-profiles-with-the-stellar-shell)
+* [Deploying Profiles from the Command 
Line](#deploying-profiles-from-the-command-line)
+
+### Deploying Profiles with the Stellar Shell
+
+Continuing the previous running example, at this point, you have seen how your 
profile behaves against real, live telemetry in a controlled execution 
environment.  The next step is to deploy your profile to the live, actively 
running Profiler topology.
+
+1.  Start the Stellar Shell with the `-z` command line argument so that a 
connection to Zookeeper is established.  This is required when  deploying a new 
profile definition as shown in the steps below.
+    ```
+    [root@node1 ~]# source /etc/default/metron
+    [root@node1 ~]# $METRON_HOME/bin/stellar -z $ZOOKEEPER
+    Stellar, Go!
+    [Stellar]>>>
+    [Stellar]>>> %functions CONFIG CONFIG_GET, CONFIG_PUT
+    ```
+
+1. If you haven't already, define your profile.
+       ```
+       [Stellar]>>> conf := SHELL_EDIT()
+       [Stellar]>>> conf
+       {
+         "profiles": [
+           {
+             "profile": "hello-world",
+             "onlyif":  "exists(ip_src_addr)",
+             "foreach": "ip_src_addr",
+             "init":    { "count": "0" },
+             "update":  { "count": "count + 1" },
+             "result":  "count"
+           }
+         ]
+       }
+       ```
+
+1. Check what is already deployed.  
+
+       Pushing a new profile configuration is destructive.  It will overwrite 
any existing configuration.  Check what you have out there.  Manually merge the 
existing configuration with your new profile definition.
+
+       ```
+       [Stellar]>>> existing := CONFIG_GET("PROFILER")
+       ```
+
+1. Deploy your profile.  This will push the configuration to to the live, 
actively running Profiler topology.  This will overwrite any existing profile 
definitions.
+       ```
+       [Stellar]>>> CONFIG_PUT("PROFILER", conf)
+       ```
+
+### Deploying Profiles from the Command Line
+
+1. Create the profile definition in a file located at 
`$METRON_HOME/config/zookeeper/profiler.json`.  This file will likely not 
exist, if you have never created Profiles before.
+
+    The following example will create a profile that simply counts the number 
of messages per `ip_src_addr`.
+    ```
+    {
+      "profiles": [
+        {
+          "profile": "hello-world",
+          "onlyif":  "exists(ip_src_addr)",
+          "foreach": "ip_src_addr",
+          "init":    { "count": "0" },
+          "update":  { "count": "count + 1" },
+          "result":  "count"
+        }
+      ]
+    }
+    ```
+
+1. Upload the profile definition to Zookeeper.
+
+    ```
+    $ source /etc/default/metron
+    $ cd $METRON_HOME
+    $ bin/zk_load_configs.sh -m PUSH -i config/zookeeper/ -z $ZOOKEEPER
+    ```
+
+    You can validate this by reading back the Metron configuration from 
Zookeeper using the same script. The result should look-like the following.
+    ```
+    $ bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER
+    ...
+    PROFILER Config: profiler
+    {
+      "profiles": [
+        {
+          "profile": "hello-world",
+          "onlyif":  "exists(ip_src_addr)",
+          "foreach": "ip_src_addr",
+          "init":    { "count": "0" },
+          "update":  { "count": "count + 1" },
+          "result":  "count"
+        }
+      ]
+    }
+    ```
+
+1. Ensure that test messages are being sent to the Profiler's input topic in 
Kafka.  The Profiler will consume messages from the input topic defined in the 
Profiler's configuration (see [Configuring the 
Profiler](#configuring-the-profiler)).  By default this is the `indexing` topic.
+
+1. Check the HBase table to validate that the Profiler is writing the profile. 
 Remember that the Profiler is flushing the profile every 15 minutes.  You will 
need to wait at least this long to start seeing profile data in HBase.
+    ```
+    $ /usr/hdp/current/hbase-client/bin/hbase shell
+    hbase(main):001:0> count 'profiler'
+    ```
+
+1. Use the [Profiler Client](../metron-profiler-client) to read the profile 
data.  The following `PROFILE_GET` command will read the data written by the 
`hello-world` profile. This assumes that `10.0.0.1` is one of the values for 
`ip_src_addr` contained within the telemetry consumed by the Profiler.
+
+    ```
+    $ source /etc/default/metron
+    $ bin/stellar -z $ZOOKEEPER
+    [Stellar]>>> PROFILE_GET( "hello-world", "10.0.0.1", PROFILE_FIXED(30, 
"MINUTES"))
+    [451, 448]
+    ```
+
+    This result indicates that over the past 30 minutes, the Profiler stored 
two values related to the source IP address "10.0.0.1".  In the first 15 minute 
period, the IP `10.0.0.1` was seen in 451 telemetry messages.  In the second 15 
minute period, the same IP was seen in 448 telemetry messages.
+
+    It is assumed that the `PROFILE_GET` client is correctly configured to 
match the Profile configuration before using it to read that Profile.  More 
information on configuring and using the Profiler client can be found 
[here](../metron-profiler-client).  
+
+## Installation
+
+The Profiler can be installed with either of these two methods.
+
+ * [Ambari Installation](#ambari-installation)
+ * [Manual Installation](#manual-installation)
+
+### Ambari Installation
+
+The Metron Profiler is installed automatically when installing Metron using 
the Ambari MPack.  You can skip the [Installation](#installation) section and 
move ahead to [Creating Profiles](#creating-profiles) should this be the case.
+
+### Manual Installation
+
+This section will describe the steps necessary to manually install the 
Profiler on an RPM-based Linux distribution.  This assumes that core Metron has 
already been installed and validated.  If you installed Metron using the 
[Ambari MPack](#ambari-mpack), then the Profiler has already been installed and 
you can skip this section.
+
+1. Build the Metron RPMs (see Building the 
[RPMs](../../metron-deployment#rpms)).  
+
+    You may have already built the Metron RPMs when core Metron was installed.
+
+    ```
+    $ find metron-deployment/ -name "metron-profiler*.rpm"
+    
metron-deployment//packaging/docker/rpm-docker/RPMS/noarch/metron-profiler-0.4.1-201707131420.noarch.rpm
+    ```
+
+1. Copy the Profiler RPM to the installation host.  
+
+    The installation host must be the same host on which core Metron was 
installed.  Depending on how you installed Metron, the Profiler RPM might have 
already been copied to this host with the other Metron RPMs.
+
+    ```
+    [root@node1 ~]# find /localrepo/  -name "metron-profiler*.rpm"
+    /localrepo/metron-profiler-0.4.1-201707112313.noarch.rpm
+    ```
+
+1. Install the RPM.
+
+    ```
+    [root@node1 ~]# rpm -ivh metron-profiler-*.noarch.rpm
+    Preparing...                ########################################### 
[100%]
+       1:metron-profiler        ########################################### 
[100%]
+    ```
+
+    ```
+    [root@node1 ~]# rpm -ql metron-profiler
+    /usr/metron
+    /usr/metron/0.4.2
+    /usr/metron/0.4.2/bin
+    /usr/metron/0.4.2/bin/start_profiler_topology.sh
+    /usr/metron/0.4.2/config
+    /usr/metron/0.4.2/config/profiler.properties
+    /usr/metron/0.4.2/flux
+    /usr/metron/0.4.2/flux/profiler
+    /usr/metron/0.4.2/flux/profiler/remote.yaml
+    /usr/metron/0.4.2/lib
+    /usr/metron/0.4.2/lib/metron-profiler-0.4.2-uber.jar
+    ```
+
+1. Edit the configuration file located at 
`$METRON_HOME/config/profiler.properties`.  
+    ```
+    kafka.zk=node1:2181
+    kafka.broker=node1:6667
+    ```
+    * Change `kafka.zk` to refer to Zookeeper in your environment.  
+    * Change `kafka.broker` to refer to a Kafka Broker in your environment.
+
+1. Create a table within HBase that will store the profile data. By default, 
the table is named `profiler` with a column family `P`.  The table name and 
column family must match the Profiler's configuration (see [Configuring the 
Profiler](#configuring-the-profiler)).  
+
+    ```
+    $ /usr/hdp/current/hbase-client/bin/hbase shell
+    hbase(main):001:0> create 'profiler', 'P'
+    ```
+
+1. Start the Profiler topology.
+    ```
+    $ cd $METRON_HOME
+    $ bin/start_profiler_topology.sh
+    ```
+
+At this point the Profiler is running and consuming telemetry messages.  We 
have not defined any profiles yet, so it is not doing anything very useful.  
The next section walks you through the steps to create your very first "Hello, 
World!" profile.
+
+## Configuring the Profiler
+
+The Profiler runs as an independent Storm topology.  The configuration for the 
Profiler topology is stored in local filesystem at 
`$METRON_HOME/config/profiler.properties`. After changing these values, the 
Profiler topology must be restarted for the changes to take effect.
+
+| Setting                                                                      
 | Description
+|---                                                                           
 |---
+| [`profiler.input.topic`](#profilerinputtopic)                                
 | The name of the input Kafka topic.
+| [`profiler.output.topic`](#profileroutputtopic)                              
 | The name of the output Kafka topic.
+| [`profiler.period.duration`](#profilerperiodduration)                        
 | The duration of each profile period.  
+| [`profiler.period.duration.units`](#profilerperioddurationunits)             
 | The units used to specify the 
[`profiler.period.duration`](#profilerperiodduration).
+| [`profiler.window.duration`](#profilerwindowduration)                        
 | The duration of each profile window.
+| [`profiler.window.duration.units`](#profilerpwindowdurationunits)            
 | The units used to specify the 
[`profiler.window.duration`](#profilerwindowduration).
+| [`profiler.window.lag`](#profilerwindowlag)                                  
 | The maximum time lag for timestamps.
+| [`profiler.window.lag.units`](#profilerpwindowlagunits)                      
 | The units used to specify the [`profiler.window.lag`](#profilerwindowlag).
+| [`profiler.workers`](#profilerworkers)                                       
 | The number of worker processes for the topology.
+| [`profiler.executors`](#profilerexecutors)                                   
 | The number of executors to spawn per component.
+| [`profiler.ttl`](#profilerttl)                                               
 | If a message has not been applied to a Profile in this period of time, the 
Profile will be forgotten and its resources will be cleaned up.
+| [`profiler.ttl.units`](#profilerttlunits)                                    
 | The units used to specify the `profiler.ttl`.
+| [`profiler.hbase.salt.divisor`](#profilerhbasesaltdivisor)                   
 | A salt is prepended to the row key to help prevent hot-spotting.
+| [`profiler.hbase.table`](#profilerhbasetable)                                
 | The name of the HBase table that profiles are written to.
+| [`profiler.hbase.column.family`](#profilerhbasecolumnfamily)                 
 | The column family used to store profiles.
+| [`profiler.hbase.batch`](#profilerhbasebatch)                                
 | The number of puts that are written to HBase in a single batch.
+| 
[`profiler.hbase.flush.interval.seconds`](#profilerhbaseflushintervalseconds) | 
The maximum number of seconds between batch writes to HBase.
+| [`topology.kryo.register`](#topologykryoregister)                            
 | Storm will use Kryo serialization for these classes.
+| [`profiler.writer.batchSize`](#profilerwriterbatchsize)                      
 | The number of records to batch when writing to Kakfa.
+| [`profiler.writer.batchTimeout`](#profilerwriterbatchtimeout)                
 | The timeout in ms for batching when writing to Kakfa.
+
+
+### `profiler.input.topic`
+
+*Default*: indexing
+
+The name of the Kafka topic from which to consume data.  By default, the 
Profiler consumes data from the `indexing` topic so that it has access to fully 
enriched telemetry.
+
+### `profiler.output.topic`
+
+*Default*: enrichments
+
+The name of the Kafka topic to which profile data is written.  This property 
is only applicable to profiles that define  the [`result` `triage` 
field](#result).  This allows Profile data to be selectively triaged like any 
other source of telemetry in Metron.
+
+### `profiler.period.duration`
+
+*Default*: 15
+
+The duration of each profile period.  This value should be defined along with 
[`profiler.period.duration.units`](#profilerperioddurationunits).
+
+*Important*: To read a profile using the [Profiler 
Client](metron-analytics/metron-profiler-client), the Profiler Client's 
`profiler.client.period.duration` property must match this value.  Otherwise, 
the Profiler Client will be unable to read the profile data.  
+
+### `profiler.period.duration.units`
+
+*Default*: MINUTES
+
+The units used to specify the `profiler.period.duration`.  This value should 
be defined along with [`profiler.period.duration`](#profilerperiodduration).
+
+*Important*: To read a profile using the Profiler Client, the Profiler 
Client's `profiler.client.period.duration.units` property must match this 
value.  Otherwise, the [Profiler 
Client](metron-analytics/metron-profiler-client) will be unable to read the 
profile data.
+
+### `profiler.window.duration`
+
+*Default*: 30
+
+The duration of each profile window.  Telemetry that arrives within a slice of 
time is processed within a single window.  
+
+Many windows of telemetry will be processed during a single profile period.  
This does not change the output of the Profiler, it only changes how the 
Profiler processes data. The window defines how much data the Profiler 
processes in a single pass.
+
+This value should be defined along with 
[`profiler.window.duration.units`](#profilerwindowdurationunits).
+
+This value must be less than the period duration as defined by 
[`profiler.period.duration`](#profilerperiodduration) and 
[`profiler.period.duration.units`](#profilerperioddurationunits).
+
+### `profiler.window.duration.units`
+
+*Default*: SECONDS
+
+The units used to specify the `profiler.window.duration`.  This value should 
be defined along with [`profiler.window.duration`](#profilerwindowduration).
+
+### `profiler.window.lag`
+
+*Default*: 1
+
+The maximum time lag for timestamps. Timestamps cannot arrive out-of-order by 
more than this amount. This value should be defined along with 
[`profiler.window.lag.units`](#profilerwindowlagunits).
+
+### `profiler.window.lag.units`
+
+*Default*: SECONDS
+
+The units used to specify the `profiler.window.lag`.  This value should be 
defined along with [`profiler.window.lag`](#profilerwindowlag).
+
+### `profiler.workers`
+
+*Default*: 1
+
+The number of worker processes to create for the Profiler topology.  This 
property is useful for performance tuning the Profiler.
+
+### `profiler.executors`
+
+*Default*: 0
+
+The number of executors to spawn per component for the Profiler topology.  
This property is useful for performance tuning the Profiler.
+
+### `profiler.ttl`
+
+*Default*: 30
+
+ If a message has not been applied to a Profile in this period of time, the 
Profile will be terminated and its resources will be cleaned up. This value 
should be defined along with [`profiler.ttl.units`](#profilerttlunits).
+
+ This time-to-live does not affect the persisted Profile data in HBase.  It 
only affects the state stored in memory during the execution of the latest 
profile period.  This state will be deleted if the time-to-live is exceeded.
+
+### `profiler.ttl.units`
+
+*Default*: MINUTES
+
+The units used to specify the [`profiler.ttl`](#profilerttl).
+
+### `profiler.hbase.salt.divisor`
+
+*Default*: 1000
+
+A salt is prepended to the row key to help prevent hotspotting.  This constant 
is used to generate the salt.  This constant should be roughly equal to the 
number of nodes in the Hbase cluster to ensure even distribution of data.
+
+### `profiler.hbase.table`
+
+*Default*: profiler
+
+The name of the HBase table that profile data is written to.  The Profiler 
expects that the table exists and is writable.  It will not create the table.
+
+### `profiler.hbase.column.family`
+
+*Default*: P
+
+The column family used to store profile data in HBase.
+
+### `profiler.hbase.batch`
+
+*Default*: 10
+
+The number of puts that are written to HBase in a single batch.
+
+### `profiler.hbase.flush.interval.seconds`
+
+*Default*: 30
+
+The maximum number of seconds between batch writes to HBase.
+
+### `topology.kryo.register`
+
+*Default*:
+```
+[ org.apache.metron.profiler.ProfileMeasurement, \
+  org.apache.metron.profiler.ProfilePeriod, \
+  org.apache.metron.common.configuration.profiler.ProfileResult, \
+  org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \
+  org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \
+  org.apache.metron.common.configuration.profiler.ProfilerConfig, \
+  org.apache.metron.common.configuration.profiler.ProfileConfig, \
+  org.json.simple.JSONObject, \
+  java.util.LinkedHashMap, \
+  org.apache.metron.statistics.OnlineStatisticsProvider ]
+```               
+
+Storm will use Kryo serialization for these classes. Kryo serialization is 
more performant than Java serialization, in most cases.  
+
+For these classes, Storm will uses Kryo's `FieldSerializer` as defined in the 
[Storm Serialization 
docs](http://storm.apache.org/releases/1.1.2/Serialization.html).  For all 
other classes not in this list, Storm defaults to using Java serialization 
which is slower and not recommended for a production topology.
+
+This value should only need altered if you have defined a profile that results 
in a non-primitive, user-defined type that is not in this list.  If the class 
is not defined in this list, Java serialization will be used and the class must 
adhere to Java's serialization requirements.  
+
+The performance of the entire Profiler topology can be negatively impacted if 
any profile produces results that undergo Java serialization.

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/pom.xml 
b/metron-analytics/metron-profiler-storm/pom.xml
new file mode 100644
index 0000000..22c6255
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -0,0 +1,407 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+       Foundation (ASF) under one or more contributor license agreements. See 
the
+       NOTICE file distributed with this work for additional information 
regarding
+       copyright ownership. The ASF licenses this file to You under the Apache 
License,
+       Version 2.0 (the "License"); you may not use this file except in 
compliance
+       with the License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0
+       Unless required by applicable law or agreed to in writing, software 
distributed
+       under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+       OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for
+  the specific language governing permissions and limitations under the 
License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-analytics</artifactId>
+        <version>0.5.1</version>
+    </parent>
+    <artifactId>metron-profiler-storm</artifactId>
+    <url>https://metron.apache.org/</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-writer</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-storm-kafka</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-statistics</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-profiler-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-profiler-client</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+            <version>${global_kryo_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${global_hadoop_version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${global_hbase_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware</groupId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${global_log4j_core_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${global_log4j_core_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>${global_kafka_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${global_kafka_version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.tempus-fugit</groupId>
+            <artifactId>tempus-fugit</artifactId>
+            <version>1.2-20140129.191141-5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-integration-test</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-test-utilities</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    
<createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            
<shadedArtifactAttached>true</shadedArtifactAttached>
+                            <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    
<shadedPattern>org.apache.metron.guava.metron-profiler</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    
<shadedPattern>org.apache.metron.jackson</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>storm:storm-core:*</exclude>
+                                    <exclude>storm:storm-lib:*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer 
implementation="org.atteo.classindex.ClassIndexTransformer"/>
+                                <transformer
+                                  
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                     <resources>
+                                        <resource>.yaml</resource>
+                                        <resource>LICENSE.txt</resource>
+                                        <resource>ASL2.0</resource>
+                                        <resource>NOTICE.txt</resource>
+                                      </resources>
+                                </transformer>
+                                <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE 
THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
+                                <!--transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                    <addHeader>false</addHeader>
+                                    <projectName>${project.name}</projectName>
+                                </transformer-->
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.atteo.classindex</groupId>
+                        <artifactId>classindex-transformer</artifactId>
+                        <version>${global_classindex_version}</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging 
phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/main/assembly/assembly.xml 
b/metron-analytics/metron-profiler-storm/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..9c43074
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/main/assembly/assembly.xml
@@ -0,0 +1,72 @@
+<!--
+  ~
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<assembly>
+    <id>archive</id>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>${project.basedir}/src/main/config</directory>
+            <outputDirectory>config</outputDirectory>
+            <useDefaultExcludes>true</useDefaultExcludes>
+            <excludes>
+                <exclude>**/*.formatted</exclude>
+                <exclude>**/*.filtered</exclude>
+            </excludes>
+            <fileMode>0644</fileMode>
+            <lineEnding>unix</lineEnding>
+            <filtered>true</filtered>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/src/main/scripts</directory>
+            <outputDirectory>bin</outputDirectory>
+            <useDefaultExcludes>true</useDefaultExcludes>
+            <excludes>
+                <exclude>**/*.formatted</exclude>
+                <exclude>**/*.filtered</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <lineEnding>unix</lineEnding>
+            <filtered>true</filtered>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/src/main/flux</directory>
+            <outputDirectory>flux</outputDirectory>
+            <useDefaultExcludes>true</useDefaultExcludes>
+            <excludes>
+                <exclude>**/*.formatted</exclude>
+                <exclude>**/*.filtered</exclude>
+            </excludes>
+            <fileMode>0644</fileMode>
+            <lineEnding>unix</lineEnding>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/target</directory>
+            <includes>
+                
<include>${project.artifactId}-${project.version}-uber.jar</include>
+            </includes>
+            <outputDirectory>lib</outputDirectory>
+            <useDefaultExcludes>true</useDefaultExcludes>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties 
b/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
new file mode 100644
index 0000000..dc30838
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
@@ -0,0 +1,71 @@
+#
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+#
+
+##### Storm #####
+
+topology.worker.childopts=
+topology.auto-credentials=
+profiler.workers=1
+profiler.executors=0
+topology.message.timeout.secs=30
+topology.max.spout.pending=100000
+topology.fall.back.on.java.serialization=true
+topology.testing.always.try.serialize=false
+topology.kryo.register=[ org.apache.metron.profiler.ProfileMeasurement, \
+    org.apache.metron.profiler.ProfilePeriod, \
+    org.apache.metron.common.configuration.profiler.ProfileResult, \
+    org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \
+    org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \
+    org.apache.metron.common.configuration.profiler.ProfilerConfig, \
+    org.apache.metron.common.configuration.profiler.ProfileConfig, \
+    org.json.simple.JSONObject, \
+    org.json.simple.JSONArray, \
+    java.util.LinkedHashMap, \
+    org.apache.metron.statistics.OnlineStatisticsProvider ]
+
+##### Profiler #####
+
+profiler.input.topic=indexing
+profiler.output.topic=enrichments
+profiler.period.duration=15
+profiler.period.duration.units=MINUTES
+profiler.window.duration=30
+profiler.window.duration.units=SECONDS
+profiler.ttl=30
+profiler.ttl.units=MINUTES
+profiler.window.lag=1
+profiler.window.lag.units=MINUTES
+profiler.max.routes.per.bolt=10000
+
+##### HBase #####
+
+profiler.hbase.salt.divisor=1000
+profiler.hbase.table=profiler
+profiler.hbase.column.family=P
+profiler.hbase.batch=10
+profiler.hbase.flush.interval.seconds=30
+
+##### Kafka #####
+
+kafka.zk=node1:2181
+kafka.broker=node1:6667
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml 
b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
new file mode 100644
index 0000000..da71b27
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
@@ -0,0 +1,218 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: "profiler"
+
+config:
+    topology.workers: ${profiler.workers}
+    topology.acker.executors: ${profiler.executors}
+    topology.worker.childopts: ${topology.worker.childopts}
+    topology.auto-credentials: ${topology.auto-credentials}
+    topology.message.timeout.secs: ${topology.message.timeout.secs}
+    topology.max.spout.pending: ${topology.max.spout.pending}
+    topology.testing.always.try.serialize: 
${topology.testing.always.try.serialize}
+    topology.fall.back.on.java.serialization: 
${topology.fall.back.on.java.serialization}
+    topology.kryo.register: ${topology.kryo.register}
+
+components:
+
+    -   id: "rowKeyBuilder"
+        className: "org.apache.metron.profiler.hbase.SaltyRowKeyBuilder"
+        properties:
+            - name: "saltDivisor"
+              value: ${profiler.hbase.salt.divisor}
+        configMethods:
+            - name: "withPeriodDuration"
+              args: [${profiler.period.duration}, 
"${profiler.period.duration.units}"]
+
+    -   id: "columnBuilder"
+        className: "org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder"
+        constructorArgs:
+            - "${profiler.hbase.column.family}"
+
+    -   id: "hbaseMapper"
+        className: "org.apache.metron.profiler.storm.ProfileHBaseMapper"
+        properties:
+            - name: "rowKeyBuilder"
+              ref: "rowKeyBuilder"
+            - name: "columnBuilder"
+              ref: "columnBuilder"
+
+    # Any kafka props for the producer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "value.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "key.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "group.id"
+                    - "profiler"
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
+
+    # The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - "value"
+
+    -   id: "kafkaConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
+        constructorArgs:
+            # zookeeper hosts
+            - ref: "kafkaProps"
+            # topic name
+            - "${profiler.input.topic}"
+            - "${kafka.zk}"
+            - ref: "fields"
+        configMethods:
+            -   name: "setFirstPollOffsetStrategy"
+                args:
+                    - "${kafka.start}"
+
+    -   id: "kafkaWriterProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
+
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args: ["${profiler.output.topic}"]
+            -   name: "withZkQuorum"
+                args: ["${kafka.zk}"]
+            -   name: "withProducerConfigs"
+                args: [ref: "kafkaWriterProps"]
+
+    -   id: "kafkaEmitter"
+        className: "org.apache.metron.profiler.storm.KafkaEmitter"
+
+    -   id: "hbaseEmitter"
+        className: "org.apache.metron.profiler.storm.HBaseEmitter"
+
+    -   id: "windowDuration"
+        className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
+        constructorArgs:
+            - ${profiler.window.duration}
+            - "${profiler.window.duration.units}"
+
+    -   id: "windowLag"
+        className: "org.apache.storm.topology.base.BaseWindowedBolt$Duration"
+        constructorArgs:
+            - ${profiler.window.lag}
+            - "${profiler.window.lag.units}"
+
+spouts:
+
+    -   id: "kafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
+        constructorArgs:
+            - ref: "kafkaConfig"
+
+bolts:
+
+    -   id: "splitterBolt"
+        className: "org.apache.metron.profiler.storm.ProfileSplitterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+
+    -   id: "builderBolt"
+        className: "org.apache.metron.profiler.storm.ProfileBuilderBolt"
+        configMethods:
+            - name: "withZookeeperUrl"
+              args: ["${kafka.zk}"]
+            - name: "withPeriodDuration"
+              args: [${profiler.period.duration}, 
"${profiler.period.duration.units}"]
+            - name: "withProfileTimeToLive"
+              args: [${profiler.ttl}, "${profiler.ttl.units}"]
+            - name: "withEmitter"
+              args: [ref: "kafkaEmitter"]
+            - name: "withEmitter"
+              args: [ref: "hbaseEmitter"]
+            - name: "withTumblingWindow"
+              args: [ref: "windowDuration"]
+            - name: "withLag"
+              args: [ref: "windowLag"]
+            - name: "withMaxNumberOfRoutes"
+              args: [${profiler.max.routes.per.bolt}]
+            - name: "withTimestampField"
+              args: ["timestamp"]
+
+    -   id: "hbaseBolt"
+        className: "org.apache.metron.hbase.bolt.HBaseBolt"
+        constructorArgs:
+            - "${profiler.hbase.table}"
+            - ref: "hbaseMapper"
+        configMethods:
+            - name: "withTableProvider"
+              args: ["${hbase.provider.impl}"]
+            - name: "withBatchSize"
+              args: [${profiler.hbase.batch}]
+            - name: "withFlushIntervalSecs"
+              args: [${profiler.hbase.flush.interval.seconds}]
+
+    -   id: "kafkaBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+            - "PROFILER"
+        configMethods:
+            -   name: "withBulkMessageWriter"
+                args: [ref: "kafkaWriter"]
+
+streams:
+
+    -   name: "spout -> splitter"
+        from: "kafkaSpout"
+        to: "splitterBolt"
+        grouping:
+            type: LOCAL_OR_SHUFFLE
+
+    -   name: "splitter -> builder"
+        from: "splitterBolt"
+        to: "builderBolt"
+        grouping:
+            type: FIELDS
+            args: ["entity", "profile"]
+
+    -   name: "builder -> hbase"
+        from: "builderBolt"
+        to: "hbaseBolt"
+        grouping:
+            streamId: "hbase"
+            type: LOCAL_OR_SHUFFLE
+
+    -   name: "builder -> kafka"
+        from: "builderBolt"
+        to: "kafkaBolt"
+        grouping:
+            streamId: "kafka"
+            type: LOCAL_OR_SHUFFLE

http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
 
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
new file mode 100644
index 0000000..02503c2
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignal.java
@@ -0,0 +1,135 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.storm;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Signals a flush on a fixed frequency; every X milliseconds.
+ */
+public class FixedFrequencyFlushSignal implements FlushSignal {
+
+  protected static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The latest known timestamp.
+   */
+  private long currentTime;
+
+  /**
+   * The time when the next flush should occur.
+   */
+  private long flushTime;
+
+  /**
+   * The amount of time between flushes in milliseconds.
+   */
+  private long flushFrequency;
+
+  public FixedFrequencyFlushSignal(long flushFrequencyMillis) {
+
+    if(flushFrequencyMillis < 0) {
+      throw new IllegalArgumentException("flush frequency must be >= 0");
+    }
+
+    this.flushFrequency = flushFrequencyMillis;
+    reset();
+  }
+
+  /**
+   * Resets the state used to keep track of time.
+   */
+  @Override
+  public void reset() {
+    flushTime = 0;
+    currentTime = 0;
+
+    LOG.debug("Flush counters reset");
+  }
+
+  /**
+   * Update the internal state which tracks time.
+   *
+   * @param timestamp The timestamp received within a tuple.
+   */
+  @Override
+  public void update(long timestamp) {
+
+    if(timestamp > currentTime) {
+
+      // need to update current time
+      LOG.debug("Updating current time; last={}, new={}", currentTime, 
timestamp);
+      currentTime = timestamp;
+
+    } else if ((currentTime - timestamp) > flushFrequency) {
+
+      // significantly out-of-order timestamps
+      LOG.warn("Timestamps out-of-order by '{}' ms. This may indicate a 
problem in the data. last={}, current={}",
+              (currentTime - timestamp),
+              timestamp,
+              currentTime);
+    }
+
+    if(flushTime == 0) {
+
+      // set the next time to flush
+      flushTime = currentTime + flushFrequency;
+      LOG.debug("Setting flush time; '{}' ms until flush; flushTime={}, 
currentTime={}, flushFreq={}",
+              timeToNextFlush(),
+              flushTime,
+              currentTime,
+              flushFrequency);
+    }
+  }
+
+  /**
+   * Returns true, if it is time to flush.
+   *
+   * @return True if time to flush.  Otherwise, false.
+   */
+  @Override
+  public boolean isTimeToFlush() {
+
+    boolean flush = currentTime > flushTime;
+    LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}",
+            flush,
+            timeToNextFlush(),
+            currentTime,
+            flushTime);
+
+    return flush;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+
+  /**
+   * Returns the number of milliseconds to the next flush.
+   * @return The time left until the next flush.
+   */
+  private long timeToNextFlush() {
+    return Math.max(0, flushTime - currentTime);
+  }
+}

Reply via email to