METRON-739 Create Local Profile Runner (nickwallen) closes apache/metron#693


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/073d6b50
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/073d6b50
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/073d6b50

Branch: refs/heads/master
Commit: 073d6b50def2ebd21e0f9c87b4d039e6dabf7616
Parents: 1f9a791
Author: nickwallen <[email protected]>
Authored: Tue Aug 15 17:33:16 2017 -0400
Committer: nickallen <[email protected]>
Committed: Tue Aug 15 17:33:16 2017 -0400

----------------------------------------------------------------------
 .../metron-profiler-client/README.md            |  67 +++
 .../profiler/client/stellar/FixedLookback.java  |   4 +-
 .../profiler/client/stellar/GetProfile.java     |  18 +-
 .../client/stellar/ProfilerClientConfig.java    | 104 ++++
 .../profiler/client/stellar/ProfilerConfig.java | 104 ----
 .../client/stellar/ProfilerFunctions.java       | 214 +++++++++
 .../metron/profiler/client/stellar/Util.java    |   2 +-
 .../profiler/client/stellar/WindowLookback.java |   4 +-
 .../metron/profiler/client/GetProfileTest.java  | 420 -----------------
 .../profiler/client/stellar/GetProfileTest.java | 419 ++++++++++++++++
 .../client/stellar/ProfilerFunctionsTest.java   | 209 ++++++++
 .../client/stellar/WindowLookbackTest.java      |   6 +-
 .../profiler/DefaultMessageDistributor.java     | 147 ++++++
 .../metron/profiler/DefaultMessageRouter.java   |  82 ++++
 .../metron/profiler/DefaultProfileBuilder.java  | 332 +++++++++++++
 .../metron/profiler/MessageDistributor.java     |  56 +++
 .../apache/metron/profiler/MessageRoute.java    |  65 +++
 .../apache/metron/profiler/MessageRouter.java   |  46 ++
 .../apache/metron/profiler/ProfileBuilder.java  | 301 +-----------
 .../metron/profiler/StandAloneProfiler.java     |  88 ++++
 .../profiler/DefaultMessageDistributorTest.java | 157 ++++++
 .../profiler/DefaultMessageRouterTest.java      | 183 +++++++
 .../profiler/DefaultProfileBuilderTest.java     | 472 +++++++++++++++++++
 .../metron/profiler/ProfileBuilderTest.java     | 460 ------------------
 .../profiler/bolt/ProfileBuilderBolt.java       |  97 ++--
 .../profiler/bolt/ProfileSplitterBolt.java      |  70 +--
 .../profiler/bolt/ProfileBuilderBoltTest.java   |  68 +--
 .../profiler/bolt/ProfileSplitterBoltTest.java  |   3 +-
 28 files changed, 2767 insertions(+), 1431 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/README.md 
b/metron-analytics/metron-profiler-client/README.md
index dcf30f6..a8b5a55 100644
--- a/metron-analytics/metron-profiler-client/README.md
+++ b/metron-analytics/metron-profiler-client/README.md
@@ -389,3 +389,70 @@ Returns: The selected profile measurements.
 ```
 
 The client API call above has retrieved the past hour of the 'test' profile 
for the entity '192.168.138.158'.
+
+## Developing Profiles
+
+Troubleshooting issues when programming against a live stream of data can be 
difficult.  The Stellar REPL is a powerful tool to help work out the kinds of 
enrichments and transformations that are needed.  The Stellar REPL can also be 
used to help when developing profiles for the Profiler.
+
+Follow these steps in the Stellar REPL to see how it can be used to help 
create profiles.
+
+1.  Take a first pass at defining your profile.  As an example, in the editor 
copy/paste the basic "Hello, World" profile below.
+    ```
+    [Stellar]>>> conf := SHELL_EDIT()
+    [Stellar]>>> conf
+    {
+      "profiles": [
+        {
+          "profile": "hello-world",
+          "onlyif":  "exists(ip_src_addr)",
+          "foreach": "ip_src_addr",
+          "init":    { "count": "0" },
+          "update":  { "count": "count + 1" },
+          "result":  "count"
+        }
+      ]
+    }
+    ```
+
+1.  Initialize the Profiler.
+    ```
+    [Stellar]>>> profiler := PROFILER_INIT(conf)
+    [Stellar]>>> profiler
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+    ```
+
+1. Create a message to simulate the type of telemetry that you expect to be 
profiled.   As an example, in the editor copy/paste the JSON below.
+    ```
+    [Stellar]>>> message := SHELL_EDIT()
+    [Stellar]>>> message
+    {
+      "ip_src_addr": "10.0.0.1",
+      "protocol": "HTTPS",
+      "length": "10",
+      "bytes_in": "234"
+    }
+    ```
+
+1. Apply some telemetry messages to your profiles.  The following applies the 
same message 3 times.
+    ```
+    [Stellar]>>> PROFILER_APPLY(message, profiler)
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+
+    [Stellar]>>> PROFILER_APPLY(message, profiler)
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+
+    [Stellar]>>> PROFILER_APPLY(message, profiler)
+    org.apache.metron.profiler.StandAloneProfiler@4f8ef473
+    ```
+
+1. Flush the Profiler to see what has been calculated.  A flush is what occurs 
at the end of each 15 minute period in the Profiler.  The result is a list of 
profile measurements.  Each measurement is a map containing detailed 
information about the profile data that has been generated.
+    ```
+    [Stellar]>>> values := PROFILER_FLUSH(profiler)
+    [Stellar]>>> values
+    [{period={duration=900000, period=1669628, start=1502665200000, 
end=1502666100000}, 
+       profile=hello-world, groups=[], value=3, entity=10.0.0.1}]
+    ```
+    
+    This profile simply counts the number of messages by IP source address.  
Notice that the value is '3' for the entity '10.0.0.1' as we applied 3 messages 
with an 'ip_src_addr' of '10.0.0.1'.  There will always be one measurement for 
each [profile, entity] pair.
+    
+1. If you are unhappy with the data that has been generated, then 'wash, rinse 
and repeat' this process.  After you are satisfied with the data being 
generated by the profile, then follow the [Getting 
Started](../metron-profiler#getting-started) guide to use the profile against 
your live, streaming data in a Metron cluster.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
index 02b4a4c..f68f653 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
@@ -55,8 +55,8 @@ public class FixedLookback implements StellarFunction {
       configOverridesMap = rawMap == null || rawMap.isEmpty() ? 
Optional.empty() : Optional.of(rawMap);
     }
     Map<String, Object> effectiveConfigs = Util.getEffectiveConfig(context, 
configOverridesMap.orElse(null));
-    Long tickDuration = ProfilerConfig.PROFILER_PERIOD.get(effectiveConfigs, 
Long.class);
-    TimeUnit tickUnit = 
TimeUnit.valueOf(ProfilerConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs, 
String.class));
+    Long tickDuration = 
ProfilerClientConfig.PROFILER_PERIOD.get(effectiveConfigs, Long.class);
+    TimeUnit tickUnit = 
TimeUnit.valueOf(ProfilerClientConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs,
 String.class));
     long end = System.currentTimeMillis();
     long start = end - units.toMillis(durationAgo);
     return ProfilePeriod.visitPeriods(start, end, tickDuration, tickUnit, 
Optional.empty(), period -> period);

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index 03ac0a6..802c552 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -20,12 +20,12 @@
 
 package org.apache.metron.profiler.client.stellar;
 
-import static 
org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_COLUMN_FAMILY;
-import static 
org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_HBASE_TABLE;
-import static 
org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static 
org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD;
-import static 
org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_PERIOD_UNITS;
-import static 
org.apache.metron.profiler.client.stellar.ProfilerConfig.PROFILER_SALT_DIVISOR;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
 import static org.apache.metron.profiler.client.stellar.Util.getArg;
 import static 
org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
 
@@ -101,8 +101,6 @@ import org.slf4j.LoggerFactory;
 )
 public class GetProfile implements StellarFunction {
 
-
-
   /**
    * Cached client that can retrieve profile values.
    */
@@ -202,10 +200,6 @@ public class GetProfile implements StellarFunction {
     return groups;
   }
 
-
-
-
-
   /**
    * Creates the ColumnBuilder to use in accessing the profile data.
    * @param global The global configuration.

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
new file mode 100644
index 0000000..351b807
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
@@ -0,0 +1,104 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.hbase.HTableProvider;
+
+import java.util.Map;
+
+public enum ProfilerClientConfig {
+  /**
+   * A global property that defines the name of the HBase table used to store 
profile data.
+   */
+  PROFILER_HBASE_TABLE("profiler.client.hbase.table", "profiler", 
String.class),
+
+  /**
+   * A global property that defines the name of the column family used to 
store profile data.
+   */
+  PROFILER_COLUMN_FAMILY("profiler.client.hbase.column.family", "P", 
String.class),
+
+  /**
+   * A global property that defines the name of the HBaseTableProvider 
implementation class.
+   */
+  PROFILER_HBASE_TABLE_PROVIDER("hbase.provider.impl", 
HTableProvider.class.getName(), String.class),
+
+  /**
+   * A global property that defines the duration of each profile period.  This 
value
+   * should be defined along with 'profiler.client.period.duration.units'.
+   */
+  PROFILER_PERIOD("profiler.client.period.duration", 15L, Long.class),
+
+  /**
+   * A global property that defines the units of the profile period duration.  
This value
+   * should be defined along with 'profiler.client.period.duration'.
+   */
+  PROFILER_PERIOD_UNITS("profiler.client.period.duration.units", "MINUTES", 
String.class),
+
+  /**
+   * A global property that defines the salt divisor used to store profile 
data.
+   */
+  PROFILER_SALT_DIVISOR("profiler.client.salt.divisor", 1000L, Long.class);
+
+  String key;
+  Object defaultValue;
+  Class<?> valueType;
+
+  ProfilerClientConfig(String key, Object defaultValue, Class<?> valueType) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+    this.valueType = valueType;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public Object getDefault() {
+    return getDefault(valueType);
+  }
+
+  public <T> T getDefault(Class<T> clazz) {
+    return defaultValue == null?null:ConversionUtils.convert(defaultValue, 
clazz);
+  }
+
+  public Object get(Map<String, Object> profilerConfig) {
+    return getOrDefault(profilerConfig, defaultValue);
+  }
+
+  public Object getOrDefault(Map<String, Object> profilerConfig, Object 
defaultValue) {
+    return getOrDefault(profilerConfig, defaultValue, valueType);
+  }
+
+  public <T> T get(Map<String, Object> profilerConfig, Class<T> clazz) {
+    return getOrDefault(profilerConfig, defaultValue, clazz);
+  }
+
+  public <T> T getOrDefault(Map<String, Object> profilerConfig, Object 
defaultValue, Class<T> clazz) {
+    Object o = profilerConfig.getOrDefault(key, defaultValue);
+    return o == null?null:ConversionUtils.convert(o, clazz);
+  }
+
+  @Override
+  public String toString() {
+    return key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
deleted file mode 100644
index e2ec275..0000000
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.client.stellar;
-
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.metron.hbase.HTableProvider;
-
-import java.util.Map;
-
-public enum ProfilerConfig {
-  /**
-   * A global property that defines the name of the HBase table used to store 
profile data.
-   */
-  PROFILER_HBASE_TABLE("profiler.client.hbase.table", "profiler", 
String.class),
-
-  /**
-   * A global property that defines the name of the column family used to 
store profile data.
-   */
-  PROFILER_COLUMN_FAMILY("profiler.client.hbase.column.family", "P", 
String.class),
-
-  /**
-   * A global property that defines the name of the HBaseTableProvider 
implementation class.
-   */
-  PROFILER_HBASE_TABLE_PROVIDER("hbase.provider.impl", 
HTableProvider.class.getName(), String.class),
-
-  /**
-   * A global property that defines the duration of each profile period.  This 
value
-   * should be defined along with 'profiler.client.period.duration.units'.
-   */
-  PROFILER_PERIOD("profiler.client.period.duration", 15L, Long.class),
-
-  /**
-   * A global property that defines the units of the profile period duration.  
This value
-   * should be defined along with 'profiler.client.period.duration'.
-   */
-  PROFILER_PERIOD_UNITS("profiler.client.period.duration.units", "MINUTES", 
String.class),
-
-  /**
-   * A global property that defines the salt divisor used to store profile 
data.
-   */
-  PROFILER_SALT_DIVISOR("profiler.client.salt.divisor", 1000L, Long.class);
-
-  String key;
-  Object defaultValue;
-  Class<?> valueType;
-  ProfilerConfig(String key, Object defaultValue, Class<?> valueType) {
-    this.key = key;
-    this.defaultValue = defaultValue;
-    this.valueType = valueType;
-  }
-
-  public String getKey() {
-    return key;
-  }
-
-  public Object getDefault() {
-    return getDefault(valueType);
-  }
-
-  public <T> T getDefault(Class<T> clazz) {
-    return defaultValue == null?null:ConversionUtils.convert(defaultValue, 
clazz);
-  }
-
-  public Object get(Map<String, Object> profilerConfig) {
-    return getOrDefault(profilerConfig, defaultValue);
-  }
-
-  public Object getOrDefault(Map<String, Object> profilerConfig, Object 
defaultValue) {
-    return getOrDefault(profilerConfig, defaultValue, valueType);
-  }
-
-  public <T> T get(Map<String, Object> profilerConfig, Class<T> clazz) {
-    return getOrDefault(profilerConfig, defaultValue, clazz);
-  }
-
-  public <T> T getOrDefault(Map<String, Object> profilerConfig, Object 
defaultValue, Class<T> clazz) {
-    Object o = profilerConfig.getOrDefault(key, defaultValue);
-    return o == null?null:ConversionUtils.convert(o, clazz);
-  }
-
-  @Override
-  public String toString() {
-    return key;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
new file mode 100644
index 0000000..827e1c4
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
@@ -0,0 +1,214 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.StandAloneProfiler;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.Util.getArg;
+import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+
+/**
+ * Stellar functions that allow interaction with the core Profiler components
+ * through the Stellar REPL.
+ */
+public class ProfilerFunctions {
+
+  private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Stellar(
+          namespace="PROFILER",
+          name="INIT",
+          description="Creates a local profile runner that can execute 
profiles.",
+          params={
+                  "config", "The profiler configuration as a string."
+          },
+          returns="A local profile runner."
+  )
+  public static class ProfilerInit implements StellarFunction {
+
+    @Override
+    public void initialize(Context context) {
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> global = (Map<String, Object>) 
context.getCapability(GLOBAL_CONFIG, false)
+              .orElse(Collections.emptyMap());
+
+      // how long is the profile period?
+      long duration = PROFILER_PERIOD.getOrDefault(global, 
PROFILER_PERIOD.getDefault(), Long.class);
+      String configuredUnits = PROFILER_PERIOD_UNITS.getOrDefault(global, 
PROFILER_PERIOD_UNITS.getDefault(), String.class);
+      long periodDurationMillis = 
TimeUnit.valueOf(configuredUnits).toMillis(duration);
+
+      // user must provide the configuration for the profiler
+      String arg0 = getArg(0, String.class, args);
+      ProfilerConfig profilerConfig;
+      try {
+        profilerConfig = JSONUtils.INSTANCE.load(arg0, ProfilerConfig.class);
+
+      } catch(IOException e) {
+        throw new IllegalArgumentException("Invalid profiler configuration", 
e);
+      }
+
+      return new StandAloneProfiler(profilerConfig, periodDurationMillis, 
context);
+    }
+  }
+
+  @Stellar(
+          namespace="PROFILER",
+          name="APPLY",
+          description="Apply a message to a local profile runner.",
+          params={
+                  "message", "The message to apply.",
+                  "profiler", "A local profile runner returned by 
PROFILER_INIT."
+          },
+          returns="The local profile runner."
+  )
+  public static class ProfilerApply implements StellarFunction {
+
+    private JSONParser parser;
+
+    @Override
+    public void initialize(Context context) {
+      parser = new JSONParser();
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return parser != null;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+
+      // user must provide the json telemetry message
+      String arg0 = Util.getArg(0, String.class, args);
+      if(arg0 == null) {
+        throw new IllegalArgumentException(format("expected string, found 
null"));
+      }
+
+      // parse the message
+      JSONObject message;
+      try {
+        message = (JSONObject) parser.parse(arg0);
+
+      } catch(org.json.simple.parser.ParseException e) {
+        throw new IllegalArgumentException("invalid message", e);
+      }
+
+      // user must provide the stand alone profiler
+      StandAloneProfiler profiler = Util.getArg(1, StandAloneProfiler.class, 
args);
+      try {
+        profiler.apply(message);
+
+      } catch(ExecutionException e) {
+        throw new IllegalArgumentException(e);
+      }
+
+      return profiler;
+    }
+  }
+
+  @Stellar(
+          namespace="PROFILER",
+          name="FLUSH",
+          description="Flush a local profile runner.",
+          params={
+                  "profiler", "A local profile runner returned by 
PROFILER_INIT."
+          },
+          returns="A list of the profile values."
+  )
+  public static class ProfilerFlush implements StellarFunction {
+
+    @Override
+    public void initialize(Context context) {
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+
+      // user must provide the stand-alone profiler
+      StandAloneProfiler profiler = Util.getArg(0, StandAloneProfiler.class, 
args);
+      if(profiler == null) {
+        throw new IllegalArgumentException(format("expected the profiler 
returned by PROFILER_INIT, found null"));
+      }
+
+      // transform the profile measurements into maps to simplify manipulation 
in stellar
+      List<Map<String, Object>> measurements = new ArrayList<>();
+      for(ProfileMeasurement m : profiler.flush()) {
+
+        // create a map for the profile period
+        Map<String, Object> period = new HashMap<>();
+        period.put("period", m.getPeriod().getPeriod());
+        period.put("start", m.getPeriod().getStartTimeMillis());
+        period.put("duration", m.getPeriod().getDurationMillis());
+        period.put("end", m.getPeriod().getEndTimeMillis());
+
+        // create a map for the measurement
+        Map<String, Object> measurement = new HashMap<>();
+        measurement.put("profile", m.getProfileName());
+        measurement.put("entity", m.getEntity());
+        measurement.put("value", m.getProfileValue());
+        measurement.put("groups", m.getGroups());
+        measurement.put("period", period);
+
+        measurements.add(measurement);
+      }
+
+      return measurements;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
index 279a60c..82c7fba 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
@@ -82,7 +82,7 @@ public class Util {
     Map<String, Object> result = new HashMap<>(6);
 
     // extract the relevant parameters from global, the overrides and the 
defaults
-    for (ProfilerConfig k : ProfilerConfig.values()) {
+    for (ProfilerClientConfig k : ProfilerClientConfig.values()) {
       Object globalValue = 
global.containsKey(k.key)?ConversionUtils.convert(global.get(k.key), 
k.valueType):null;
       Object overrideValue = configOverridesMap == 
null?null:k.getOrDefault(configOverridesMap, null);
       Object defaultValue = k.defaultValue;

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
index 9e420e5..273b244 100644
--- 
a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
+++ 
b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/WindowLookback.java
@@ -75,8 +75,8 @@ public class WindowLookback implements StellarFunction {
 
     }
     Map<String, Object> effectiveConfigs = Util.getEffectiveConfig(context, 
configOverridesMap.orElse(null));
-    Long tickDuration = ProfilerConfig.PROFILER_PERIOD.get(effectiveConfigs, 
Long.class);
-    TimeUnit tickUnit = 
TimeUnit.valueOf(ProfilerConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs, 
String.class));
+    Long tickDuration = 
ProfilerClientConfig.PROFILER_PERIOD.get(effectiveConfigs, Long.class);
+    TimeUnit tickUnit = 
TimeUnit.valueOf(ProfilerClientConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs,
 String.class));
     Window w = null;
     try {
       w = windowCache.get(windowSelector, () -> 
WindowProcessor.process(windowSelector));

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
deleted file mode 100644
index 00d842c..0000000
--- 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.client;
-
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import 
org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.client.stellar.FixedLookback;
-import org.apache.metron.profiler.client.stellar.GetProfile;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.metron.profiler.client.stellar.ProfilerConfig.*;
-
-/**
- * Tests the GetProfile class.
- */
-public class GetProfileTest {
-
-  private static final long periodDuration = 15;
-  private static final TimeUnit periodUnits = TimeUnit.MINUTES;
-  private static final int saltDivisor = 1000;
-  private static final String tableName = "profiler";
-  private static final String columnFamily = "P";
-  private StellarStatefulExecutor executor;
-  private Map<String, Object> state;
-  private ProfileWriter profileWriter;
-  // different values of period and salt divisor, used to test 
config_overrides feature
-  private static final long periodDuration2 = 1;
-  private static final TimeUnit periodUnits2 = TimeUnit.HOURS;
-  private static final int saltDivisor2 = 2050;
-
-
-
-  private <T> T run(String expression, Class<T> clazz) {
-    return executor.execute(expression, state, clazz);
-  }
-
-  /**
-   * This method sets up the configuration context for both writing profile 
data
-   * (using profileWriter to mock the complex process of what the Profiler 
topology
-   * actually does), and then reading that profile data (thereby testing the 
PROFILE_GET
-   * Stellar client implemented in GetProfile).
-   *
-   * It runs at @Before time, and sets testclass global variables used by the 
writers and readers.
-   * The various writers and readers are in each test case, not here.
-   *
-   * @return void
-   */
-  @Before
-  public void setup() {
-    state = new HashMap<>();
-    final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
-
-    // used to write values to be read during testing
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
-    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
-
-    // global properties
-    Map<String, Object> global = new HashMap<String, Object>() {{
-      put(PROFILER_HBASE_TABLE.getKey(), tableName);
-      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), 
MockHBaseTableProvider.class.getName());
-      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
-      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
-      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
-    }};
-
-    // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
-                    .withClass(FixedLookback.class),
-            new Context.Builder()
-                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
-                    .build());
-  }
-
-  /**
-   * This method is similar to setup(), in that it sets up profiler 
configuration context,
-   * but only for the client.  Additionally, it uses periodDuration2, 
periodUnits2
-   * and saltDivisor2, instead of periodDuration, periodUnits and saltDivisor 
respectively.
-   *
-   * This is used in the unit tests that test the config_overrides feature of 
PROFILE_GET.
-   * In these tests, the context from @Before setup() is used to write the 
data, then the global
-   * context is changed to context2 (from this method).  Each test validates 
that a default read
-   * using global context2 then gets no valid results (as expected), and that 
a read using
-   * original context values in the PROFILE_GET config_overrides argument gets 
all expected results.
-   *
-   * @return context2 - The profiler client configuration context created by 
this method.
-   *    The context2 values are also set in the configuration of the 
StellarStatefulExecutor
-   *    stored in the global variable 'executor'.  However, there is no API 
for querying the
-   *    context values from a StellarStatefulExecutor, so we output the 
context2 Context object itself,
-   *    for validation purposes (so that its values can be validated as being 
significantly
-   *    different from the setup() settings).
-   */
-  private Context setup2() {
-    state = new HashMap<>();
-
-    // global properties
-    Map<String, Object> global = new HashMap<String, Object>() {{
-      put(PROFILER_HBASE_TABLE.getKey(), tableName);
-      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), 
MockHBaseTableProvider.class.getName());
-      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration2));
-      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits2.toString());
-      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor2));
-    }};
-
-    // create the modified context
-    Context context2 = new Context.Builder()
-            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
-            .build();
-
-    // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
-                    .withClass(FixedLookback.class),
-            context2);
-
-    return context2; //because there is no executor.getContext() method
-  }
-
-  /**
-   * Values should be retrievable that have NOT been stored within a group.
-   */
-  @Test
-  public void testWithNoGroups() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // execute - read the profile values - no groups
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Values should be retrievable that have been stored within a 'group'.
-   */
-  @Test
-  public void testWithOneGroup() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekends");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'), ['weekends'])";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // test the deprecated but allowed "varargs" form of groups specification
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 
'weekends')";
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Values should be retrievable that have been stored within a 'group'.
-   */
-  @Test
-  public void testWithTwoGroups() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekdays", "tuesday");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'), ['weekdays', 'tuesday'])";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // test the deprecated but allowed "varargs" form of groups specification
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 
'weekdays', 'tuesday')";
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Initialization should fail if the required context values are missing.
-   */
-  @Test(expected = IllegalStateException.class)
-  public void testMissingContext() {
-    Context empty = Context.EMPTY_CONTEXT();
-
-    // 'unset' the context that was created during setup()
-    executor.setContext(empty);
-
-    // force re-initialization with no context
-    SingletonFunctionResolver.getInstance().initialize(empty);
-
-    // validate - function should be unable to initialize
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(1000, 
'SECONDS'), groups)";
-    run(expr, List.class);
-  }
-
-  /**
-   * If the time horizon specified does not include any profile measurements, 
then
-   * none should be returned.
-   */
-  @Test
-  public void testOutsideTimeHorizon() {
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write a single value from 2 hours ago
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, 1, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'SECONDS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - there should be no values from only 4 seconds ago
-    Assert.assertEquals(0, result.size());
-  }
-
-  /**
-   * Values should be retrievable that were written with configuration 
different than current global config.
-   */
-  @Test
-  public void testWithConfigOverride() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // now change the executor configuration
-    Context context2 = setup2();
-    // validate it is changed in significant way
-    @SuppressWarnings("unchecked")
-    Map<String, Object> global = (Map<String, Object>) 
context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
-    Assert.assertEquals(PROFILER_PERIOD.get(global), periodDuration2);
-    Assert.assertNotEquals(periodDuration, periodDuration2);
-
-    // execute - read the profile values - with (wrong) default global config 
values.
-    // No error message at this time, but returns empty results list, because
-    // row keys are not correctly calculated.
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to fail to read any values
-    Assert.assertEquals(0, result.size());
-
-    // execute - read the profile values - with config_override.
-    // first two override values are strings, third is deliberately a number.
-    String overrides = "{'profiler.client.period.duration' : '" + 
periodDuration + "', "
-            + "'profiler.client.period.duration.units' : '" + 
periodUnits.toString() + "', "
-            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS', " + 
overrides + "), [], " + overrides + ")"
-            ;
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-  }
-
-  /**
-   * Values should be retrievable that have been stored within a 'group', with
-   * configuration different than current global config.
-   * This time put the config_override case before the non-override case.
-   */
-  @Test
-  public void testWithConfigAndOneGroup() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekends");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // now change the executor configuration
-    Context context2 = setup2();
-    // validate it is changed in significant way
-    @SuppressWarnings("unchecked")
-    Map<String, Object> global = (Map<String, Object>) 
context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
-    Assert.assertEquals(global.get(PROFILER_PERIOD.getKey()), 
Long.toString(periodDuration2));
-    Assert.assertNotEquals(periodDuration, periodDuration2);
-
-    // execute - read the profile values - with config_override.
-    // first two override values are strings, third is deliberately a number.
-    String overrides = "{'profiler.client.period.duration' : '" + 
periodDuration + "', "
-            + "'profiler.client.period.duration.units' : '" + 
periodUnits.toString() + "', "
-            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
-    String expr = "PROFILE_GET('profile1', 'entity1'" +
-            ", PROFILE_FIXED(4, 'HOURS', " + overrides + "), ['weekends'], " +
-            overrides + ")";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // execute - read the profile values - with (wrong) default global config 
values.
-    // No error message at this time, but returns empty results list, because
-    // row keys are not correctly calculated.
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 
['weekends'])";
-    result = run(expr, List.class);
-
-    // validate - expect to fail to read any values
-    Assert.assertEquals(0, result.size());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
new file mode 100644
index 0000000..307b548
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
@@ -0,0 +1,419 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.profiler.client.ProfileWriter;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import 
org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.client.stellar.FixedLookback;
+import org.apache.metron.profiler.client.stellar.GetProfile;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.*;
+
+/**
+ * Tests the GetProfile class.
+ */
+public class GetProfileTest {
+
+  private static final long periodDuration = 15;
+  private static final TimeUnit periodUnits = TimeUnit.MINUTES;
+  private static final int saltDivisor = 1000;
+  private static final String tableName = "profiler";
+  private static final String columnFamily = "P";
+  private StellarStatefulExecutor executor;
+  private Map<String, Object> state;
+  private ProfileWriter profileWriter;
+  // different values of period and salt divisor, used to test 
config_overrides feature
+  private static final long periodDuration2 = 1;
+  private static final TimeUnit periodUnits2 = TimeUnit.HOURS;
+  private static final int saltDivisor2 = 2050;
+
+  private <T> T run(String expression, Class<T> clazz) {
+    return executor.execute(expression, state, clazz);
+  }
+
+  /**
+   * This method sets up the configuration context for both writing profile 
data
+   * (using profileWriter to mock the complex process of what the Profiler 
topology
+   * actually does), and then reading that profile data (thereby testing the 
PROFILE_GET
+   * Stellar client implemented in GetProfile).
+   *
+   * It runs at @Before time, and sets testclass global variables used by the 
writers and readers.
+   * The various writers and readers are in each test case, not here.
+   *
+   * @return void
+   */
+  @Before
+  public void setup() {
+    state = new HashMap<>();
+    final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, 
columnFamily);
+
+    // used to write values to be read during testing
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE.getKey(), tableName);
+      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), 
MockHBaseTableProvider.class.getName());
+      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
+      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
+      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
+    }};
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(GetProfile.class)
+                    .withClass(FixedLookback.class),
+            new Context.Builder()
+                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+                    .build());
+  }
+
+  /**
+   * This method is similar to setup(), in that it sets up profiler 
configuration context,
+   * but only for the client.  Additionally, it uses periodDuration2, 
periodUnits2
+   * and saltDivisor2, instead of periodDuration, periodUnits and saltDivisor 
respectively.
+   *
+   * This is used in the unit tests that test the config_overrides feature of 
PROFILE_GET.
+   * In these tests, the context from @Before setup() is used to write the 
data, then the global
+   * context is changed to context2 (from this method).  Each test validates 
that a default read
+   * using global context2 then gets no valid results (as expected), and that 
a read using
+   * original context values in the PROFILE_GET config_overrides argument gets 
all expected results.
+   *
+   * @return context2 - The profiler client configuration context created by 
this method.
+   *    The context2 values are also set in the configuration of the 
StellarStatefulExecutor
+   *    stored in the global variable 'executor'.  However, there is no API 
for querying the
+   *    context values from a StellarStatefulExecutor, so we output the 
context2 Context object itself,
+   *    for validation purposes (so that its values can be validated as being 
significantly
+   *    different from the setup() settings).
+   */
+  private Context setup2() {
+    state = new HashMap<>();
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE.getKey(), tableName);
+      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), 
MockHBaseTableProvider.class.getName());
+      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration2));
+      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits2.toString());
+      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor2));
+    }};
+
+    // create the modified context
+    Context context2 = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+            .build();
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(GetProfile.class)
+                    .withClass(FixedLookback.class),
+            context2);
+
+    return context2; //because there is no executor.getContext() method
+  }
+
+  /**
+   * Values should be retrievable that have NOT been stored within a group.
+   */
+  @Test
+  public void testWithNoGroups() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // execute - read the profile values - no groups
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'))";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group'.
+   */
+  @Test
+  public void testWithOneGroup() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekends");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'), ['weekends'])";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+
+    // test the deprecated but allowed "varargs" form of groups specification
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 
'weekends')";
+    result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group'.
+   */
+  @Test
+  public void testWithTwoGroups() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekdays", "tuesday");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'), ['weekdays', 'tuesday'])";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+
+    // test the deprecated but allowed "varargs" form of groups specification
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 
'weekdays', 'tuesday')";
+    result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Initialization should fail if the required context values are missing.
+   */
+  @Test(expected = IllegalStateException.class)
+  public void testMissingContext() {
+    Context empty = Context.EMPTY_CONTEXT();
+
+    // 'unset' the context that was created during setup()
+    executor.setContext(empty);
+
+    // force re-initialization with no context
+    SingletonFunctionResolver.getInstance().initialize(empty);
+
+    // validate - function should be unable to initialize
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(1000, 
'SECONDS'), groups)";
+    run(expr, List.class);
+  }
+
+  /**
+   * If the time horizon specified does not include any profile measurements, 
then
+   * none should be returned.
+   */
+  @Test
+  public void testOutsideTimeHorizon() {
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write a single value from 2 hours ago
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, 1, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // execute - read the profile values
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'SECONDS'))";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - there should be no values from only 4 seconds ago
+    Assert.assertEquals(0, result.size());
+  }
+
+  /**
+   * Values should be retrievable that were written with configuration 
different than current global config.
+   */
+  @Test
+  public void testWithConfigOverride() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Collections.emptyList();
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // now change the executor configuration
+    Context context2 = setup2();
+    // validate it is changed in significant way
+    @SuppressWarnings("unchecked")
+    Map<String, Object> global = (Map<String, Object>) 
context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
+    Assert.assertEquals(PROFILER_PERIOD.get(global), periodDuration2);
+    Assert.assertNotEquals(periodDuration, periodDuration2);
+
+    // execute - read the profile values - with (wrong) default global config 
values.
+    // No error message at this time, but returns empty results list, because
+    // row keys are not correctly calculated.
+    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 
'HOURS'))";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to fail to read any values
+    Assert.assertEquals(0, result.size());
+
+    // execute - read the profile values - with config_override.
+    // first two override values are strings, third is deliberately a number.
+    String overrides = "{'profiler.client.period.duration' : '" + 
periodDuration + "', "
+            + "'profiler.client.period.duration.units' : '" + 
periodUnits.toString() + "', "
+            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS', " + 
overrides + "), [], " + overrides + ")"
+            ;
+    result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+  }
+
+  /**
+   * Values should be retrievable that have been stored within a 'group', with
+   * configuration different than current global config.
+   * This time put the config_override case before the non-override case.
+   */
+  @Test
+  public void testWithConfigAndOneGroup() {
+    final int periodsPerHour = 4;
+    final int expectedValue = 2302;
+    final int hours = 2;
+    final long startTime = System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(hours);
+    final List<Object> group = Arrays.asList("weekends");
+
+    // setup - write some measurements to be read later
+    final int count = hours * periodsPerHour;
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+    profileWriter.write(m, count, group, val -> expectedValue);
+
+    // create a variable that contains the groups to use
+    state.put("groups", group);
+
+    // now change the executor configuration
+    Context context2 = setup2();
+    // validate it is changed in significant way
+    @SuppressWarnings("unchecked")
+    Map<String, Object> global = (Map<String, Object>) 
context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
+    Assert.assertEquals(global.get(PROFILER_PERIOD.getKey()), 
Long.toString(periodDuration2));
+    Assert.assertNotEquals(periodDuration, periodDuration2);
+
+    // execute - read the profile values - with config_override.
+    // first two override values are strings, third is deliberately a number.
+    String overrides = "{'profiler.client.period.duration' : '" + 
periodDuration + "', "
+            + "'profiler.client.period.duration.units' : '" + 
periodUnits.toString() + "', "
+            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
+    String expr = "PROFILE_GET('profile1', 'entity1'" +
+            ", PROFILE_FIXED(4, 'HOURS', " + overrides + "), ['weekends'], " +
+            overrides + ")";
+    @SuppressWarnings("unchecked")
+    List<Integer> result = run(expr, List.class);
+
+    // validate - expect to read all values from the past 4 hours
+    Assert.assertEquals(count, result.size());
+
+    // execute - read the profile values - with (wrong) default global config 
values.
+    // No error message at this time, but returns empty results list, because
+    // row keys are not correctly calculated.
+    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 
['weekends'])";
+    result = run(expr, List.class);
+
+    // validate - expect to fail to read any values
+    Assert.assertEquals(0, result.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java
 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java
new file mode 100644
index 0000000..9cc8046
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/ProfilerFunctionsTest.java
@@ -0,0 +1,209 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.StandAloneProfiler;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.common.shell.StellarExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static 
org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the ProfilerFunctions class.
+ */
+public class ProfilerFunctionsTest {
+
+  /**
+   * {
+   *    "ip_src_addr": "10.0.0.1",
+   *    "ip_dst_addr": "10.0.0.2",
+   *    "source.type": "test",
+   * }
+   */
+  @Multiline
+  private String message;
+
+  /**
+   * {
+   *   "profiles": [
+   *        {
+   *          "profile":  "hello-world",
+   *          "foreach":  "ip_src_addr",
+   *          "init":     { "count": 0 },
+   *          "update":   { "count": "count + 1" },
+   *          "result":   "count"
+   *        }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String helloWorldProfilerDef;
+
+  private static final long periodDuration = 15;
+  private static final String periodUnits = "MINUTES";
+  private StellarStatefulExecutor executor;
+  private Map<String, Object> state;
+
+  private <T> T run(String expression, Class<T> clazz) {
+    return executor.execute(expression, state, clazz);
+  }
+
+  @Before
+  public void setup() {
+    state = new HashMap<>();
+
+    // global properties
+    Map<String, Object> global = new HashMap<String, Object>() {{
+      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
+      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
+    }};
+
+    // create the stellar execution environment
+    executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(ProfilerFunctions.ProfilerInit.class)
+                    .withClass(ProfilerFunctions.ProfilerApply.class)
+                    .withClass(ProfilerFunctions.ProfilerFlush.class),
+            new Context.Builder()
+                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+                    .build());
+  }
+
+  @Test
+  public void testProfilerInitNoProfiles() {
+    state.put("config", "{ \"profiles\" : [] }");
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", 
StandAloneProfiler.class);
+    assertNotNull(profiler);
+    assertEquals(0, profiler.getConfig().getProfiles().size());
+  }
+
+  @Test
+  public void testProfilerInitWithProfiles() {
+    state.put("config", helloWorldProfilerDef);
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", 
StandAloneProfiler.class);
+    assertNotNull(profiler);
+    assertEquals(1, profiler.getConfig().getProfiles().size());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerInitNoArgs() {
+    run("PROFILER_INIT()", StandAloneProfiler.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerInitInvalidArg() {
+    run("PROFILER_INIT({ \"invalid\": 2 })", StandAloneProfiler.class);
+  }
+
+  @Test
+  public void testProfilerInitWithNoGlobalConfig() {
+    state.put("config", helloWorldProfilerDef);
+    String expression = "PROFILER_INIT(config)";
+
+    // use an executor with no GLOBAL_CONFIG defined in the context
+    StellarStatefulExecutor executor = new DefaultStellarStatefulExecutor(
+            new SimpleFunctionResolver()
+                    .withClass(ProfilerFunctions.ProfilerInit.class)
+                    .withClass(ProfilerFunctions.ProfilerApply.class)
+                    .withClass(ProfilerFunctions.ProfilerFlush.class),
+            Context.EMPTY_CONTEXT());
+    StandAloneProfiler profiler = executor.execute(expression, state, 
StandAloneProfiler.class);
+
+    assertNotNull(profiler);
+    assertEquals(1, profiler.getConfig().getProfiles().size());
+  }
+
+  @Test
+  public void testProfilerApply() {
+
+    // initialize the profiler
+    state.put("config", helloWorldProfilerDef);
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", 
StandAloneProfiler.class);
+    state.put("profiler", profiler);
+
+    // apply a message to the profiler
+    state.put("message", message);
+    StandAloneProfiler result = run("PROFILER_APPLY(message, profiler)", 
StandAloneProfiler.class);
+    assertSame(profiler, result);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerApplyNoArgs() {
+    run("PROFILER_APPLY()", StandAloneProfiler.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerApplyInvalidArg() {
+    run("PROFILER_APPLY(undefined)", StandAloneProfiler.class);
+  }
+
+  @Test
+  public void testProfilerFlush() {
+
+    // initialize the profiler
+    state.put("config", helloWorldProfilerDef);
+    StandAloneProfiler profiler = run("PROFILER_INIT(config)", 
StandAloneProfiler.class);
+    state.put("profiler", profiler);
+
+    // apply a message to the profiler
+    state.put("message", message);
+    run("PROFILER_APPLY(message, profiler)", StandAloneProfiler.class);
+
+    // flush the profiles
+    List<Map<String, Object>> measurements = run("PROFILER_FLUSH(profiler)", 
List.class);
+
+    // validate
+    assertNotNull(measurements);
+    assertEquals(1, measurements.size());
+
+    Map<String, Object> measurement = measurements.get(0);
+    assertEquals("hello-world", measurement.get("profile"));
+    assertEquals("10.0.0.1", measurement.get("entity"));
+    assertEquals(1, measurement.get("value"));
+    assertEquals(Collections.emptyList(), measurement.get("groups"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerFlushNoArgs() {
+    run("PROFILER_FLUSH()", StandAloneProfiler.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testProfilerFlushInvalidArg() {
+    run("PROFILER_FLUSH(undefined)", StandAloneProfiler.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
index fd6d122..9ef1805 100644
--- 
a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
+++ 
b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/WindowLookbackTest.java
@@ -60,7 +60,7 @@ public class WindowLookbackTest {
     long durationMs = 60000;
     State state = test("1 hour", new Date()
                       , Optional.of(
-                              ImmutableMap.of( 
ProfilerConfig.PROFILER_PERIOD.getKey(), 1 )
+                              ImmutableMap.of( 
ProfilerClientConfig.PROFILER_PERIOD.getKey(), 1 )
                                    )
                       ,Assertions.NOT_EMPTY,Assertions.CONTIGUOUS);
     Assert.assertEquals(TimeUnit.HOURS.toMillis(1) / durationMs, 
state.periods.size());
@@ -114,8 +114,8 @@ public class WindowLookbackTest {
   }
 
   long getDurationMs() {
-    int duration = ProfilerConfig.PROFILER_PERIOD.getDefault(Integer.class);
-    TimeUnit unit = 
TimeUnit.valueOf(ProfilerConfig.PROFILER_PERIOD_UNITS.getDefault(String.class));
+    int duration = 
ProfilerClientConfig.PROFILER_PERIOD.getDefault(Integer.class);
+    TimeUnit unit = 
TimeUnit.valueOf(ProfilerClientConfig.PROFILER_PERIOD_UNITS.getDefault(String.class));
     return unit.toMillis(duration);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
new file mode 100644
index 0000000..ba3c0d8
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -0,0 +1,147 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.profiler.clock.WallClock;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+
+/**
+ * Distributes a message along a MessageRoute.  A MessageRoute will lead to 
one or
+ * more ProfileBuilders.
+ *
+ * A ProfileBuilder is responsible for maintaining the state of a single 
profile,
+ * for a single entity.  There will be one ProfileBuilder for each (profile, 
entity) pair.
+ * This class ensures that each ProfileBuilder receives the telemetry messages 
that
+ * it needs.
+ */
+public class DefaultMessageDistributor implements MessageDistributor {
+
+  /**
+   * The duration of each profile period in milliseconds.
+   */
+  private long periodDurationMillis;
+
+  /**
+   * Maintains the state of a profile which is unique to a profile/entity pair.
+   */
+  private transient Cache<String, ProfileBuilder> profileCache;
+
+  /**
+   * Create a new message distributor.
+   * @param periodDurationMillis The period duration in milliseconds.
+   * @param profileTimeToLiveMillis The TTL of a profile in milliseconds.
+   */
+  public DefaultMessageDistributor(long periodDurationMillis, long 
profileTimeToLiveMillis) {
+    if(profileTimeToLiveMillis < periodDurationMillis) {
+      throw new IllegalStateException(format(
+              "invalid configuration: expect profile TTL (%d) to be greater 
than period duration (%d)",
+              profileTimeToLiveMillis,
+              periodDurationMillis));
+    }
+    this.periodDurationMillis = periodDurationMillis;
+    this.profileCache = CacheBuilder
+            .newBuilder()
+            .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
+            .build();
+  }
+
+  /**
+   * Distribute a message along a MessageRoute.
+   *
+   * @param message The message that needs distributed.
+   * @param route The message route.
+   * @param context The Stellar execution context.
+   * @throws ExecutionException
+   */
+  @Override
+  public void distribute(JSONObject message, MessageRoute route, Context 
context) throws ExecutionException {
+    getBuilder(route, context).apply(message);
+  }
+
+  /**
+   * Flushes all profiles.  Flushes all ProfileBuilders that this distributor 
is responsible for.
+   *
+   * @return The profile measurements; one for each (profile, entity) pair.
+   */
+  @Override
+  public List<ProfileMeasurement> flush() {
+    List<ProfileMeasurement> measurements = new ArrayList<>();
+
+    profileCache.asMap().forEach((key, profileBuilder) -> {
+      if(profileBuilder.isInitialized()) {
+        ProfileMeasurement measurement = profileBuilder.flush();
+        measurements.add(measurement);
+      }
+    });
+
+    profileCache.cleanUp();
+    return measurements;
+  }
+
+  /**
+   * Retrieves the cached ProfileBuilder that is used to build and maintain 
the Profile.  If none exists,
+   * one will be created and returned.
+   * @param route The message route.
+   * @param context The Stellar execution context.
+   */
+  public ProfileBuilder getBuilder(MessageRoute route, Context context) throws 
ExecutionException {
+    ProfileConfig profile = route.getProfileDefinition();
+    String entity = route.getEntity();
+    return profileCache.get(
+            cacheKey(profile, entity),
+            () -> new DefaultProfileBuilder.Builder()
+                    .withDefinition(profile)
+                    .withEntity(entity)
+                    .withPeriodDurationMillis(periodDurationMillis)
+                    .withContext(context)
+                    .withClock(new WallClock())
+                    .build());
+  }
+
+  /**
+   * Builds the key that is used to lookup the ProfileState within the cache.
+   * @param profile The profile definition.
+   * @param entity The entity.
+   */
+  private String cacheKey(ProfileConfig profile, String entity) {
+    return format("%s:%s", profile, entity);
+  }
+
+  public DefaultMessageDistributor withPeriodDurationMillis(long 
periodDurationMillis) {
+    this.periodDurationMillis = periodDurationMillis;
+    return this;
+  }
+
+  public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit 
units) {
+    return withPeriodDurationMillis(units.toMillis(duration));
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/073d6b50/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
new file mode 100644
index 0000000..5a32e69
--- /dev/null
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Routes incoming telemetry messages.
+ *
+ * A single telemetry message may need to take multiple routes.  This is the 
case
+ * when a message is needed by more than one profile.
+ */
+public class DefaultMessageRouter implements MessageRouter {
+
+  /**
+   * Executes Stellar code.
+   */
+  private StellarStatefulExecutor executor;
+
+  public DefaultMessageRouter(Context context) {
+    this.executor = new DefaultStellarStatefulExecutor();
+    StellarFunctions.initialize(context);
+    executor.setContext(context);
+  }
+
+  /**
+   * Route a telemetry message.  Finds all routes for a given telemetry 
message.
+   *
+   * @param message The telemetry message that needs routed.
+   * @param config The configuration for the Profiler.
+   * @param context The Stellar execution context.
+   * @return A list of all the routes for the message.
+   */
+  @Override
+  public List<MessageRoute> route(JSONObject message, ProfilerConfig config, 
Context context) {
+    List<MessageRoute> routes = new ArrayList<>();
+    @SuppressWarnings("unchecked")
+    final Map<String, Object> state = (Map<String, Object>) message;
+
+    // attempt to route the message to each of the profiles
+    for (ProfileConfig profile: config.getProfiles()) {
+
+      // is this message needed by this profile?
+      if (executor.execute(profile.getOnlyif(), state, Boolean.class)) {
+
+        // what is the name of the entity in this message?
+        String entity = executor.execute(profile.getForeach(), state, 
String.class);
+        routes.add(new MessageRoute(profile, entity));
+      }
+    }
+
+    return routes;
+  }
+}

Reply via email to