http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
index 6c518b1..ecf8a1b 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -36,11 +36,19 @@ import org.slf4j.LoggerFactory;
 public class Configurations implements Serializable {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private List<FieldValidator> validations = new ArrayList<>();
-  protected ConcurrentMap<String, Object> configurations = new 
ConcurrentHashMap<>();
+  protected Map<String, Object> configurations = new ConcurrentHashMap<>();
+
+  public Map<String, Object> getConfigurations() {
+    return configurations;
+  }
 
   @SuppressWarnings("unchecked")
   public Map<String, Object> getGlobalConfig() {
-    return (Map<String, Object>) 
configurations.getOrDefault(ConfigurationType.GLOBAL.getTypeName(), new 
HashMap());
+    return getGlobalConfig(true);
+  }
+
+  public Map<String, Object> getGlobalConfig(boolean emptyMapOnNonExistent) {
+    return (Map<String, Object>) 
getConfigurations().getOrDefault(ConfigurationType.GLOBAL.getTypeName(), 
emptyMapOnNonExistent?new HashMap():null);
   }
 
   public List<FieldValidator> getFieldValidations() {
@@ -59,10 +67,15 @@ public class Configurations implements Serializable {
   }
 
   public void updateGlobalConfig(Map<String, Object> globalConfig) {
-    configurations.put(ConfigurationType.GLOBAL.getTypeName(), globalConfig);
-    validations = FieldValidator.readValidations(getGlobalConfig());
+    if(globalConfig != null) {
+      getConfigurations().put(ConfigurationType.GLOBAL.getTypeName(), 
globalConfig);
+      validations = FieldValidator.readValidations(getGlobalConfig());
+    }
   }
 
+  public void deleteGlobalConfig() {
+    getConfigurations().remove(ConfigurationType.GLOBAL.getTypeName());
+  }
 
   @Override
   public boolean equals(Object o) {
@@ -72,14 +85,14 @@ public class Configurations implements Serializable {
     Configurations that = (Configurations) o;
 
     if (validations != null ? !validations.equals(that.validations) : 
that.validations != null) return false;
-    return configurations != null ? configurations.equals(that.configurations) 
: that.configurations == null;
+    return getConfigurations() != null ? 
getConfigurations().equals(that.getConfigurations()) : that.getConfigurations() 
== null;
 
   }
 
   @Override
   public int hashCode() {
     int result = validations != null ? validations.hashCode() : 0;
-    result = 31 * result + (configurations != null ? configurations.hashCode() 
: 0);
+    result = 31 * result + (getConfigurations() != null ? 
getConfigurations().hashCode() : 0);
     return result;
   }
 
@@ -87,7 +100,7 @@ public class Configurations implements Serializable {
   public String toString() {
     return "Configurations{" +
             "validations=" + validations +
-            ", configurations=" + configurations +
+            ", configurations=" + getConfigurations()+
             '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index fcc8050..ae21152 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -28,11 +28,16 @@ import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -44,8 +49,11 @@ import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConfigurationsUtils {
+  protected static final Logger LOG =  
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static CuratorFramework getClient(String zookeeperUrl) {
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@@ -175,28 +183,56 @@ public class ConfigurationsUtils {
     
configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client));
   }
 
-  public static void updateParserConfigsFromZookeeper(ParserConfigurations 
configurations, CuratorFramework client) throws Exception {
-    updateConfigsFromZookeeper(configurations, client);
-    List<String> sensorTypes = 
client.getChildren().forPath(PARSER.getZookeeperRoot());
+
+  private interface Callback {
+    void apply(String sensorType) throws Exception;
+  }
+
+  private static void updateConfigsFromZookeeper( Configurations configurations
+                                                , ConfigurationType type
+                                                , Callback callback
+                                                , CuratorFramework client
+                                                )  throws Exception
+  {
+    Exception globalUpdateException = null;
+    try {
+      updateConfigsFromZookeeper(configurations, client);
+    }
+    catch(Exception e) {
+      LOG.warn("Unable to update global config when updating indexing configs: 
" + e.getMessage(), e);
+      globalUpdateException = e;
+    }
+    List<String> sensorTypes = 
client.getChildren().forPath(type.getZookeeperRoot());
     for(String sensorType: sensorTypes) {
-      configurations.updateSensorParserConfig(sensorType, 
readSensorParserConfigBytesFromZookeeper(sensorType, client));
+      callback.apply(sensorType);
     }
+    if(globalUpdateException != null) {
+      throw globalUpdateException;
+    }
+  }
+
+  public static void updateParserConfigsFromZookeeper(ParserConfigurations 
configurations, CuratorFramework client) throws Exception {
+    updateConfigsFromZookeeper( configurations
+                              , PARSER
+                              , sensorType -> 
configurations.updateSensorParserConfig(sensorType, 
readSensorParserConfigBytesFromZookeeper(sensorType, client))
+                              , client
+                              );
   }
 
   public static void 
updateSensorIndexingConfigsFromZookeeper(IndexingConfigurations configurations, 
CuratorFramework client) throws Exception {
-    updateConfigsFromZookeeper(configurations, client);
-    List<String> sensorTypes = 
client.getChildren().forPath(INDEXING.getZookeeperRoot());
-    for(String sensorType: sensorTypes) {
-      configurations.updateSensorIndexingConfig(sensorType, 
readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
-    }
+    updateConfigsFromZookeeper( configurations
+                              , INDEXING
+                              , sensorType -> 
configurations.updateSensorIndexingConfig(sensorType, 
readSensorIndexingConfigBytesFromZookeeper(sensorType, client))
+                              , client
+                              );
   }
 
   public static void 
updateEnrichmentConfigsFromZookeeper(EnrichmentConfigurations configurations, 
CuratorFramework client) throws Exception {
-    updateConfigsFromZookeeper(configurations, client);
-    List<String> sensorTypes = 
client.getChildren().forPath(ENRICHMENT.getZookeeperRoot());
-    for(String sensorType: sensorTypes) {
-      configurations.updateSensorEnrichmentConfig(sensorType, 
readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
-    }
+    updateConfigsFromZookeeper( configurations
+                              , ENRICHMENT
+                              , sensorType -> 
configurations.updateSensorEnrichmentConfig(sensorType, 
readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client))
+                              , client
+                              );
   }
 
   public static SensorEnrichmentConfig 
readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework 
client) throws Exception {

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
index a28b15c..dfd7a65 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
@@ -23,27 +23,43 @@ import org.apache.metron.common.utils.JSONUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 public class EnrichmentConfigurations extends Configurations {
 
-    public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) 
{
-        return (SensorEnrichmentConfig) configurations.get(getKey(sensorType));
-    }
+  public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
+    return (SensorEnrichmentConfig) 
getConfigurations().get(getKey(sensorType));
+  }
 
-    public void updateSensorEnrichmentConfig(String sensorType, byte[] data) 
throws IOException {
-        updateSensorEnrichmentConfig(sensorType, new 
ByteArrayInputStream(data));
-    }
+  public void updateSensorEnrichmentConfig(String sensorType, byte[] data) 
throws IOException {
+    updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data));
+  }
 
-    public void updateSensorEnrichmentConfig(String sensorType, InputStream 
io) throws IOException {
-        SensorEnrichmentConfig sensorEnrichmentConfig = 
JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
-        updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
-    }
+  public void updateSensorEnrichmentConfig(String sensorType, InputStream io) 
throws IOException {
+    SensorEnrichmentConfig sensorEnrichmentConfig = 
JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class);
+    updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig);
+  }
 
-    public void updateSensorEnrichmentConfig(String sensorType, 
SensorEnrichmentConfig sensorEnrichmentConfig) {
-        configurations.put(getKey(sensorType), sensorEnrichmentConfig);
-    }
+  public void updateSensorEnrichmentConfig(String sensorType, 
SensorEnrichmentConfig sensorEnrichmentConfig) {
+    getConfigurations().put(getKey(sensorType), sensorEnrichmentConfig);
+  }
+
+  public void delete(String sensorType) {
+    getConfigurations().remove(getKey(sensorType));
+  }
 
-    private String getKey(String sensorType) {
-        return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType;
+  public List<String> getTypes() {
+    List<String> ret = new ArrayList<>();
+    for(String keyedSensor : getConfigurations().keySet()) {
+      if(!keyedSensor.isEmpty() && 
keyedSensor.startsWith(ConfigurationType.ENRICHMENT.getTypeName())) {
+        
ret.add(keyedSensor.substring(ConfigurationType.ENRICHMENT.getTypeName().length()
 + 1));
+      }
     }
+    return ret;
+  }
+
+  public static String getKey(String sensorType) {
+    return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index 3bd7645..003b6df 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -33,8 +33,36 @@ public class IndexingConfigurations extends Configurations {
   public static final String INDEX_CONF = "index";
   public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
 
+  public Map<String, Object> getSensorIndexingConfig(String sensorType, 
boolean emptyMapOnNonExistent) {
+    Map<String, Object> ret = (Map<String, Object>) 
getConfigurations().get(getKey(sensorType));
+    if(ret == null) {
+      return emptyMapOnNonExistent?new HashMap<>():null;
+    }
+    else {
+      return ret;
+    }
+  }
+
+  public Map<String, Object> getSensorIndexingConfig(String sensorType) {
+    return getSensorIndexingConfig(sensorType, true);
+  }
+
+  public List<String> getTypes() {
+    List<String> ret = new ArrayList<>();
+    for(String keyedSensor : getConfigurations().keySet()) {
+      if(!keyedSensor.isEmpty() && 
keyedSensor.startsWith(ConfigurationType.INDEXING.getTypeName())) {
+        
ret.add(keyedSensor.substring(ConfigurationType.INDEXING.getTypeName().length() 
+ 1));
+      }
+    }
+    return ret;
+  }
+
+  public void delete(String sensorType) {
+    getConfigurations().remove(getKey(sensorType));
+  }
+
   public Map<String, Object> getSensorIndexingConfig(String sensorType, String 
writerName) {
-    Map<String, Object> ret = (Map<String, Object>) 
configurations.get(getKey(sensorType));
+    Map<String, Object> ret = (Map<String, Object>) 
getConfigurations().get(getKey(sensorType));
     if(ret == null) {
       return new HashMap();
     }
@@ -55,15 +83,15 @@ public class IndexingConfigurations extends Configurations {
   }
 
   public void updateSensorIndexingConfig(String sensorType, Map<String, 
Object> sensorIndexingConfig) {
-    configurations.put(getKey(sensorType), sensorIndexingConfig);
+    getConfigurations().put(getKey(sensorType), sensorIndexingConfig);
   }
 
-  private String getKey(String sensorType) {
+  public static String getKey(String sensorType) {
     return ConfigurationType.INDEXING.getTypeName() + "." + sensorType;
   }
 
   public boolean isDefault(String sensorName, String writerName) {
-    Map<String, Object> ret = (Map<String, Object>) 
configurations.get(getKey(sensorName));
+    Map<String, Object> ret = (Map<String, Object>) 
getConfigurations().get(getKey(sensorName));
     if(ret == null) {
       return true;
     }
@@ -100,7 +128,7 @@ public class IndexingConfigurations extends Configurations {
     String keyPrefixString = getKey("");
     int prefixStringLength = keyPrefixString.length();
     List<Integer> configuredBatchTimeouts = new ArrayList<>();
-    for (String sensorKeyString : configurations.keySet()) {
+    for (String sensorKeyString : getConfigurations().keySet()) {
       if (sensorKeyString.startsWith(keyPrefixString)) {
         String configuredSensorName = 
sensorKeyString.substring(prefixStringLength);
         configuredBatchTimeouts.add(getBatchTimeout(configuredSensorName, 
writerName));

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
index 0ec0ed4..72af833 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
@@ -22,11 +22,13 @@ import org.apache.metron.common.utils.JSONUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 public class ParserConfigurations extends Configurations {
 
   public SensorParserConfig getSensorParserConfig(String sensorType) {
-    return (SensorParserConfig) configurations.get(getKey(sensorType));
+    return (SensorParserConfig) getConfigurations().get(getKey(sensorType));
   }
 
   public void updateSensorParserConfig(String sensorType, byte[] data) throws 
IOException {
@@ -40,10 +42,24 @@ public class ParserConfigurations extends Configurations {
 
   public void updateSensorParserConfig(String sensorType, SensorParserConfig 
sensorParserConfig) {
     sensorParserConfig.init();
-    configurations.put(getKey(sensorType), sensorParserConfig);
+    getConfigurations().put(getKey(sensorType), sensorParserConfig);
   }
 
-  private String getKey(String sensorType) {
+  public List<String> getTypes() {
+    List<String> ret = new ArrayList<>();
+    for(String keyedSensor : getConfigurations().keySet()) {
+      if(!keyedSensor.isEmpty() && 
keyedSensor.startsWith(ConfigurationType.PARSER.getTypeName())) {
+        
ret.add(keyedSensor.substring(ConfigurationType.PARSER.getTypeName().length() + 
1));
+      }
+    }
+    return ret;
+  }
+
+  public void delete(String sensorType) {
+    getConfigurations().remove(getKey(sensorType));
+  }
+
+  public static String getKey(String sensorType) {
     return ConfigurationType.PARSER.getTypeName() + "." + sensorType;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
index 9a42426..55642a9 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
@@ -80,6 +80,14 @@ public class ProfileResult {
   }
 
   @Override
+  public String toString() {
+    return "ProfileResult{" +
+            "profileExpressions=" + profileExpressions +
+            ", triageExpressions=" + triageExpressions +
+            '}';
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
index 1bca716..82af223 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
@@ -43,6 +43,13 @@ public class ProfileResultExpressions {
   }
 
   @Override
+  public String toString() {
+    return "ProfileResultExpressions{" +
+            "expression='" + expression + '\'' +
+            '}';
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
index da74f64..fbe1706 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
@@ -64,4 +64,27 @@ public class ProfileTriageExpressions {
   public Map<String, String> getExpressions() {
     return expressions;
   }
+
+  @Override
+  public String toString() {
+    return "ProfileTriageExpressions{" +
+            "expressions=" + expressions +
+            '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ProfileTriageExpressions that = (ProfileTriageExpressions) o;
+
+    return getExpressions() != null ? 
getExpressions().equals(that.getExpressions()) : that.getExpressions() == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    return getExpressions() != null ? getExpressions().hashCode() : 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
index cd651bd..e7c081a 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
@@ -40,6 +40,13 @@ public class ProfilerConfig implements Serializable {
   }
 
   @Override
+  public String toString() {
+    return "ProfilerConfig{" +
+            "profiles=" + profiles +
+            '}';
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
index e001d74..f50d770 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
@@ -31,7 +31,7 @@ import java.io.InputStream;
 public class ProfilerConfigurations extends Configurations {
 
   public ProfilerConfig getProfilerConfig() {
-    return (ProfilerConfig) configurations.get(getKey());
+    return (ProfilerConfig) getConfigurations().get(getKey());
   }
 
   public void updateProfilerConfig(byte[] data) throws IOException {
@@ -44,10 +44,15 @@ public class ProfilerConfigurations extends Configurations {
   }
 
   public void updateProfilerConfig(ProfilerConfig config) {
-    configurations.put(getKey(), config);
+    getConfigurations().put(getKey(), config);
   }
 
-  private String getKey() {
+  public static String getKey() {
     return ConfigurationType.PROFILER.getTypeName();
   }
+
+  public void delete() {
+    configurations.remove(getKey());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java
new file mode 100644
index 0000000..462d754
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper;
+
+import org.apache.metron.common.configuration.Configurations;
+
+/**
+ * A cache for multiple Configurations object.  This cache is generally kept in
+ * sync with zookeeper changes.
+ */
+public interface ConfigurationsCache extends AutoCloseable{
+  /**
+   * Return the Configurations object given the specific type of 
Configurations object.
+   * @param configClass The Configurations class to return
+   * @param <T> The type of Configurations class.
+   * @return The Configurations object
+   */
+  <T extends Configurations> T get(Class<T> configClass);
+
+  /**
+   * Reset the cache and reload from zookeeper.
+   */
+  void reset();
+
+  /**
+   * Start the cache.
+   */
+  void start();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java
new file mode 100644
index 0000000..42967b2
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper;
+
+import com.google.common.collect.Iterables;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.*;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.zookeeper.SimpleEventListener;
+import org.apache.metron.zookeeper.ZKCache;
+import org.apache.metron.common.zookeeper.configurations.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+public class ZKConfigurationsCache implements ConfigurationsCache {
+  private static final Logger LOG =  
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+
+  private interface UpdaterSupplier {
+    ConfigurationsUpdater<? extends Configurations> create(Map<Class<? extends 
Configurations>, Configurations> configs,Reloadable reloadCallback);
+  }
+
+  public enum ConfiguredTypes implements UpdaterSupplier {
+    ENRICHMENT((c,r) -> new EnrichmentUpdater( r, 
createSupplier(EnrichmentConfigurations.class, c)))
+    ,PARSER((c,r) -> new ParserUpdater( r, 
createSupplier(ParserConfigurations.class, c)))
+    ,INDEXING((c,r) -> new IndexingUpdater( r, 
createSupplier(IndexingConfigurations.class, c)))
+    ,PROFILER((c,r) -> new ProfilerUpdater( r, 
createSupplier(ProfilerConfigurations.class, c)))
+    ;
+    UpdaterSupplier updaterSupplier;
+    ConfiguredTypes(UpdaterSupplier supplier) {
+      this.updaterSupplier = supplier;
+    }
+
+    @Override
+    public ConfigurationsUpdater<? extends Configurations>
+    create(Map<Class<? extends Configurations>, Configurations> configs, 
Reloadable reloadCallback) {
+      return updaterSupplier.create(configs, reloadCallback);
+    }
+  }
+
+  private List<ConfigurationsUpdater< ? extends Configurations>> updaters;
+  private final Map<Class<? extends Configurations>, Configurations> configs;
+  private ZKCache cache;
+  private CuratorFramework client;
+  ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  public ZKConfigurationsCache(CuratorFramework client, Reloadable reloadable, 
ConfiguredTypes... types) {
+    updaters = new ArrayList<>();
+    configs = new HashMap<>();
+    this.client = client;
+    for(ConfiguredTypes t : types) {
+      ConfigurationsUpdater<? extends Configurations> updater = 
t.create(configs, reloadable);
+      configs.put(updater.getConfigurationClass(), 
updater.defaultConfigurations());
+      updaters.add(updater);
+    }
+  }
+
+  public ZKConfigurationsCache(CuratorFramework client, Reloadable reloadable) 
{
+    this(client, reloadable, ConfiguredTypes.values());
+  }
+
+  public ZKConfigurationsCache(CuratorFramework client, ConfiguredTypes... 
types) {
+    this(client, (name, type) -> {}, types);
+  }
+
+  public ZKConfigurationsCache(CuratorFramework client) {
+    this(client, ConfiguredTypes.values());
+  }
+
+  private static <T extends Configurations> Supplier<T> 
createSupplier(Class<T> clazz, Map<Class<? extends Configurations>, 
Configurations> configs) {
+    return () -> clazz.cast(configs.get(clazz));
+  }
+
+  @Override
+  public void start() {
+    initializeCache(client);
+  }
+
+  @Override
+  public void close() {
+    Lock writeLock = lock.writeLock();
+    try {
+      writeLock.lock();
+      if (cache != null) {
+        cache.close();
+      }
+    }
+    finally{
+      writeLock.unlock();
+    }
+  }
+
+  public void reset() {
+    Lock writeLock = lock.writeLock();
+    try {
+      writeLock.lock();
+      close();
+      initializeCache(client);
+    }
+    finally{
+      writeLock.unlock();
+    }
+  }
+
+  private void initializeCache(CuratorFramework client) {
+    Lock writeLock = lock.writeLock();
+    try {
+      writeLock.lock();
+      SimpleEventListener listener = new SimpleEventListener.Builder()
+              .with(Iterables.transform(updaters, u -> u::update)
+                      , TreeCacheEvent.Type.NODE_ADDED
+                      , TreeCacheEvent.Type.NODE_UPDATED
+              )
+              .with(Iterables.transform(updaters, u -> u::delete)
+                      , TreeCacheEvent.Type.NODE_REMOVED
+              )
+              .build();
+      cache = new ZKCache.Builder()
+              .withClient(client)
+              .withListener(listener)
+              .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT)
+              .build();
+
+      for (ConfigurationsUpdater<? extends Configurations> updater : updaters) 
{
+        updater.forceUpdate(client);
+      }
+      cache.start();
+    } catch (Exception e) {
+      LOG.error("Unable to initialize zookeeper cache: " + e.getMessage(), e);
+      throw new IllegalStateException("Unable to initialize zookeeper cache: " 
+ e.getMessage(), e);
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+
+  public <T extends Configurations> T get(Class<T> configClass){
+    Lock readLock = lock.readLock();
+    try {
+      readLock.lock();
+      return configClass.cast(configs.get(configClass));
+    }
+    finally{
+      readLock.unlock();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java
new file mode 100644
index 0000000..9ff2bee
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.Configurations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.function.Supplier;
+
+/**
+ * Handles update for an underlying Configurations object.  This is the base 
abstract implementation.
+ * You will find system-specific implementations (e.g. IndexingUpdater, 
ParserUpdater, etc.) which
+ * correspond to the various components of our system which accept 
configuration from zookeeper.
+ *
+ * @param <T> the Type of Configuration
+ */
+public abstract class ConfigurationsUpdater<T extends Configurations> 
implements Serializable {
+  protected static final Logger LOG =  
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private Reloadable reloadable;
+  private Supplier<T> configSupplier;
+
+  /**
+   * Construct a ConfigurationsUpdater
+   * @param reloadable A callback which gets called whenever a reload happens
+   * @param configSupplier A Supplier which creates the Configurations object.
+   */
+  public ConfigurationsUpdater(Reloadable reloadable
+                              , Supplier<T> configSupplier
+  )
+  {
+    this.reloadable = reloadable;
+    this.configSupplier = configSupplier;
+  }
+
+  /**
+   * Update callback, this is called whenever a path is updated in zookeeper 
which we are monitoring.
+   *
+   * @param client The CuratorFramework
+   * @param path The zookeeper path
+   * @param data The change.
+   * @throws IOException When update is impossible.
+   */
+  public void update(CuratorFramework client, String path, byte[] data) throws 
IOException {
+    if (data.length != 0) {
+      String name = path.substring(path.lastIndexOf("/") + 1);
+      if (path.startsWith(getType().getZookeeperRoot())) {
+        LOG.debug("Updating the {} config: {} -> {}", getType().name(), name, 
new String(data == null?"".getBytes():data));
+        update(name, data);
+        reloadCallback(name, getType());
+      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
+        LOG.debug("Updating the global config: {}", new String(data == 
null?"".getBytes():data));
+        getConfigurations().updateGlobalConfig(data);
+        reloadCallback(name, ConfigurationType.GLOBAL);
+      }
+    }
+  }
+
+  /**
+   * Delete callback, this is called whenever a path is deleted in zookeeper 
which we are monitoring.
+   *
+   * @param client The CuratorFramework
+   * @param path The zookeeper path
+   * @param data The change.
+   * @throws IOException When update is impossible.
+   */
+  public void delete(CuratorFramework client, String path, byte[] data) throws 
IOException {
+    String name = path.substring(path.lastIndexOf("/") + 1);
+    if (path.startsWith(getType().getZookeeperRoot())) {
+      LOG.debug("Deleting {} {} config from internal cache", getType().name(), 
name);
+      delete(name);
+      reloadCallback(name, getType());
+    } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
+      LOG.debug("Deleting global config from internal cache");
+      getConfigurations().deleteGlobalConfig();
+      reloadCallback(name, ConfigurationType.GLOBAL);
+    }
+  }
+
+  /**
+   * The ConfigurationsType that we're monitoring.
+   * @return The ConfigurationsType enum
+   */
+  public abstract ConfigurationType getType();
+
+  /**
+   * The simple update.  This differs from the full update elsewhere in that
+   * this is ONLY called on updates to path to the zookeeper nodes which 
correspond
+   * to your configurations type (rather than all configurations type).
+   * @param name The path
+   * @param data The data updated
+   * @throws IOException when update is unable to happen
+   */
+  public abstract void update(String name, byte[] data) throws IOException;
+
+  /**
+   * The simple delete.  This differs from the full delete elsewhere in that
+   * this is ONLY called on deletes to path to the zookeeper nodes which 
correspond
+   * to your configurations type (rather than all configurations type).
+   * @param name the path
+   * @throws IOException when update is unable to happen
+   */
+  public abstract void delete(String name);
+
+  /**
+   *
+   * @return The Class for the Configurations type.
+   */
+  public abstract Class<T> getConfigurationClass();
+
+  /**
+   * This pulls the configuration from zookeeper and updates the cache.  It 
represents the initial state.
+   * Force update is called when the zookeeper cache is initialized to ensure 
that the caches are updated.
+   * @param client
+   */
+  public abstract void forceUpdate(CuratorFramework client);
+
+  /**
+   * Create an empty Configurations object of type T.
+   * @return
+   */
+  public abstract T defaultConfigurations();
+
+  protected void reloadCallback(String name, ConfigurationType type) {
+    reloadable.reloadCallback(name, type);
+  }
+
+  protected T getConfigurations() {
+    return configSupplier.get();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java
new file mode 100644
index 0000000..29de525
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class EnrichmentUpdater extends 
ConfigurationsUpdater<EnrichmentConfigurations>{
+
+  public EnrichmentUpdater(Reloadable reloadable, 
Supplier<EnrichmentConfigurations> configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<EnrichmentConfigurations> getConfigurationClass() {
+    return EnrichmentConfigurations.class;
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      
ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), 
client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current enrichment configs in zookeeper, but the cache 
should load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load configs from zookeeper, but the cache should 
load lazily...", e);
+    }
+  }
+
+  @Override
+  public EnrichmentConfigurations defaultConfigurations() {
+    return new EnrichmentConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.ENRICHMENT;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete(name);
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateSensorEnrichmentConfig(name, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java
new file mode 100644
index 0000000..8017930
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class IndexingUpdater extends 
ConfigurationsUpdater<IndexingConfigurations> {
+  public IndexingUpdater(Reloadable reloadable, 
Supplier<IndexingConfigurations> configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<IndexingConfigurations> getConfigurationClass() {
+    return IndexingConfigurations.class;
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      
ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(),
 client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current indexing configs in zookeeper, but the cache should 
load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load indexing configs from zookeeper, but the cache 
should load lazily...", e);
+    }
+  }
+
+  @Override
+  public IndexingConfigurations defaultConfigurations() {
+    return new IndexingConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.INDEXING;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete(name);
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateSensorIndexingConfig(name, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java
new file mode 100644
index 0000000..c91844e
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class ParserUpdater extends ConfigurationsUpdater<ParserConfigurations> 
{
+  public ParserUpdater(Reloadable reloadable, Supplier<ParserConfigurations> 
configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<ParserConfigurations> getConfigurationClass() {
+    return ParserConfigurations.class;
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      
ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), 
client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current parser configs in zookeeper, but the cache should 
load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load configs from zookeeper, but the cache should 
load lazily...", e);
+    }
+  }
+
+  @Override
+  public ParserConfigurations defaultConfigurations() {
+    return new ParserConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.PARSER;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete(name);
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateSensorParserConfig(name, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
new file mode 100644
index 0000000..68c5203
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import static 
org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+
+public class ProfilerUpdater extends 
ConfigurationsUpdater<ProfilerConfigurations> {
+  public ProfilerUpdater(Reloadable reloadable, 
Supplier<ProfilerConfigurations> configSupplier) {
+    super(reloadable, configSupplier);
+  }
+
+  @Override
+  public Class<ProfilerConfigurations> getConfigurationClass() {
+    return ProfilerConfigurations.class;
+  }
+
+  private ProfilerConfig readFromZookeeper(CuratorFramework client) throws 
Exception {
+    byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot());
+    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), 
ProfilerConfig.class);
+  }
+
+  @Override
+  public void forceUpdate(CuratorFramework client) {
+    try {
+      ConfigurationsUtils.updateConfigsFromZookeeper(getConfigurations(), 
client);
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current global configs in zookeeper, but the cache should 
load lazily...");
+    }
+    catch(Exception e) {
+      LOG.warn("Unable to load global configs from zookeeper, but the cache 
should load lazily...", e);
+    }
+    try {
+      ProfilerConfig config = readFromZookeeper(client);
+      if(config != null) {
+        getConfigurations().updateProfilerConfig(config);
+      }
+
+    }
+    catch (KeeperException.NoNodeException nne) {
+      LOG.warn("No current profiler configs in zookeeper, but the cache should 
load lazily...");
+    }
+    catch (Exception e) {
+      LOG.warn("Unable to load profiler configs from zookeeper, but the cache 
should load lazily...", e);
+    }
+  }
+
+  @Override
+  public ProfilerConfigurations defaultConfigurations() {
+    return new ProfilerConfigurations();
+  }
+
+  @Override
+  public ConfigurationType getType() {
+    return ConfigurationType.PROFILER;
+  }
+
+  @Override
+  public void delete(String name) {
+    getConfigurations().delete();
+  }
+
+  @Override
+  public void update(String name, byte[] data) throws IOException {
+    getConfigurations().updateProfilerConfig(data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java
new file mode 100644
index 0000000..308c74e
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper.configurations;
+
+import org.apache.metron.common.configuration.ConfigurationType;
+
+import java.io.Serializable;
+
+public interface Reloadable extends Serializable {
+  void reloadCallback(String name, ConfigurationType type);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/scripts/stellar
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/scripts/stellar 
b/metron-platform/metron-common/src/main/scripts/stellar
index 56c2d4d..3f53c49 100644
--- a/metron-platform/metron-common/src/main/scripts/stellar
+++ b/metron-platform/metron-common/src/main/scripts/stellar
@@ -33,4 +33,4 @@ export METRON_VERSION=${project.version}
 export METRON_HOME=/usr/metron/$METRON_VERSION
 export STELLAR_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar)
 export MANAGEMENT_LIB=$(find $METRON_HOME/lib/ -name metron-management*.jar)
-java $JVMFLAGS -cp 
"$HBASE_CONFIGS:${CONTRIB:-$METRON_HOME/contrib/*}:$STELLAR_LIB:$MANAGEMENT_LIB"
 org.apache.metron.stellar.common.shell.StellarShell "$@"
+java $JVMFLAGS -cp 
"${CONTRIB:-$METRON_HOME/contrib/*}:$STELLAR_LIB:$MANAGEMENT_LIB:$HBASE_CONFIGS"
 org.apache.metron.stellar.common.shell.StellarShell "$@"

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
new file mode 100644
index 0000000..64bf986
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.zookeeper;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.configuration.*;
+import 
org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.integration.components.ZKServerComponent;
+import org.junit.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Map;
+
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
+
+public class ZKConfigurationsCacheIntegrationTest {
+  private CuratorFramework client;
+
+  /**
+   {
+  "profiles": [
+    {
+      "profile": "example2",
+      "foreach": "ip_src_addr",
+      "onlyif": "protocol == 'HTTP'",
+      "init": {
+        "total_bytes": 0.0
+      },
+      "update": {
+        "total_bytes": "total_bytes + bytes_in"
+      },
+      "result": "total_bytes",
+      "expires": 30
+    }
+  ]
+}
+   */
+  @Multiline
+  public static String profilerConfig;
+  /**
+   {
+    "hdfs" : {
+      "index": "yaf",
+      "batchSize": 1,
+      "enabled" : true
+    },
+    "elasticsearch" : {
+    "index": "yaf",
+    "batchSize": 25,
+    "batchTimeout": 7,
+    "enabled" : false
+    },
+    "solr" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : false
+    }
+  }
+   */
+  @Multiline
+  public static String testIndexingConfig;
+
+  /**
+   {
+    "enrichment": {
+      "fieldMap": { }
+    ,"fieldToTypeMap": { }
+   },
+    "threatIntel": {
+      "fieldMap": { },
+      "fieldToTypeMap": { },
+      "triageConfig" : { }
+     }
+   }
+   */
+  @Multiline
+  public static String testEnrichmentConfig;
+
+  /**
+  {
+  "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
+  "sensorTopic":"brop",
+  "parserConfig": {}
+  }
+   */
+  @Multiline
+  public static String testParserConfig;
+
+  /**
+   *{
+  "es.clustername": "metron",
+  "es.ip": "localhost",
+  "es.port": 9300,
+  "es.date.format": "yyyy.MM.dd.HH"
+   }
+   */
+  @Multiline
+  public static String globalConfig;
+
+  public static File profilerDir = new 
File("../../metron-analytics/metron-profiler/src/test/config/zookeeper");
+  public ConfigurationsCache cache;
+
+  public ZKServerComponent zkComponent;
+
+  @Before
+  public void setup() throws Exception {
+    zkComponent = new ZKServerComponent();
+    zkComponent.start();
+    client = ConfigurationsUtils.getClient(zkComponent.getConnectionString());
+    client.start();
+    cache = new ZKConfigurationsCache(client);
+    cache.start();
+    {
+      //parser
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new 
File(TestConstants.PARSER_CONFIGS_PATH + "/parsers/bro.json")));
+      ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", config, 
client);
+    }
+    {
+      //indexing
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new 
File(TestConstants.SAMPLE_CONFIG_PATH + "/indexing/test.json")));
+      ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", config, 
client);
+    }
+    {
+      //enrichments
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new 
File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json")));
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", 
config, client);
+    }
+    {
+      //enrichments
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new 
File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json")));
+      ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", 
config, client);
+    }
+    {
+      //profiler
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new 
File(profilerDir, "/readme-example-1/profiler.json")));
+      ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client);
+    }
+    {
+      //global config
+      byte[] config = IOUtils.toByteArray(new FileInputStream(new 
File(TestConstants.SAMPLE_CONFIG_PATH + "/global.json")));
+      ConfigurationsUtils.writeGlobalConfigToZookeeper(config, client);
+    }
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if(cache != null) {
+      cache.close();
+    }
+    if(client != null) {
+      client.close();
+    }
+    if(zkComponent != null) {
+      zkComponent.stop();
+    }
+  }
+
+
+  @Test
+  public void validateDelete() throws Exception {
+    client.delete().forPath(ConfigurationType.GLOBAL.getZookeeperRoot());
+    client.delete().forPath(ConfigurationType.INDEXING.getZookeeperRoot() + 
"/test");
+    client.delete().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + 
"/test");
+    client.delete().forPath(ConfigurationType.PARSER.getZookeeperRoot() + 
"/bro");
+    client.delete().forPath(ConfigurationType.PROFILER.getZookeeperRoot() );
+    //global
+    {
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //indexing
+    {
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> 
Assert.assertNull(config.getSensorIndexingConfig("test", false)));
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //enrichment
+    {
+      EnrichmentConfigurations config = cache.get( 
EnrichmentConfigurations.class);
+      assertEventually(() -> 
Assert.assertNull(config.getSensorEnrichmentConfig("test")));
+      assertEventually(()-> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //parser
+    {
+      ParserConfigurations config = cache.get( ParserConfigurations.class);
+      assertEventually(() -> 
Assert.assertNull(config.getSensorParserConfig("bro")));
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+    //profiler
+    {
+      ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
+      assertEventually(() -> Assert.assertNull(config.getProfilerConfig()));
+      assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false)));
+    }
+  }
+
+  @Test
+  public void validateUpdate() throws Exception {
+    ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", 
testIndexingConfig.getBytes(), client);
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.getBytes(), 
client);
+    ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", 
testEnrichmentConfig.getBytes(), client);
+    ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", 
testParserConfig.getBytes(), client);
+    ConfigurationsUtils.writeProfilerConfigToZookeeper( 
profilerConfig.getBytes(), client);
+    //indexing
+    {
+      Map<String, Object> expectedConfig = 
JSONUtils.INSTANCE.load(testIndexingConfig, new TypeReference<Map<String, 
Object>>() {});
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getSensorIndexingConfig("test")));
+    }
+    //enrichment
+    {
+      SensorEnrichmentConfig expectedConfig = 
JSONUtils.INSTANCE.load(testEnrichmentConfig, SensorEnrichmentConfig.class);
+      Map<String, Object> expectedGlobalConfig = 
JSONUtils.INSTANCE.load(globalConfig, new TypeReference<Map<String, Object>>() 
{});
+      EnrichmentConfigurations config = cache.get( 
EnrichmentConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getSensorEnrichmentConfig("test")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, 
config.getGlobalConfig()));
+    }
+    //parsers
+    {
+      SensorParserConfig expectedConfig = 
JSONUtils.INSTANCE.load(testParserConfig, SensorParserConfig.class);
+      ParserConfigurations config = cache.get( ParserConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getSensorParserConfig("bro")));
+    }
+    //profiler
+    {
+      ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(profilerConfig, 
ProfilerConfig.class);
+      ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getProfilerConfig()));
+    }
+  }
+
+  @Test
+  public void validateBaseWrite() throws Exception {
+    File globalConfigFile = new File(TestConstants.SAMPLE_CONFIG_PATH + 
"/global.json");
+    Map<String, Object> expectedGlobalConfig = 
JSONUtils.INSTANCE.load(globalConfigFile, new TypeReference<Map<String, 
Object>>() { });
+    //indexing
+    {
+      File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + 
"/indexing/test.json");
+      Map<String, Object> expectedConfig = JSONUtils.INSTANCE.load(inFile, new 
TypeReference<Map<String, Object>>() {
+      });
+      IndexingConfigurations config = cache.get( IndexingConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getSensorIndexingConfig("test")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, 
config.getGlobalConfig()));
+      assertEventually(() -> 
Assert.assertNull(config.getSensorIndexingConfig("notthere", false)));
+    }
+    //enrichment
+    {
+      File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + 
"/enrichments/test.json");
+      SensorEnrichmentConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, 
SensorEnrichmentConfig.class);
+      EnrichmentConfigurations config = cache.get( 
EnrichmentConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getSensorEnrichmentConfig("test")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, 
config.getGlobalConfig()));
+      assertEventually(() -> 
Assert.assertNull(config.getSensorEnrichmentConfig("notthere")));
+    }
+    //parsers
+    {
+      File inFile = new File(TestConstants.PARSER_CONFIGS_PATH + 
"/parsers/bro.json");
+      SensorParserConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, 
SensorParserConfig.class);
+      ParserConfigurations config = cache.get( ParserConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getSensorParserConfig("bro")));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, 
config.getGlobalConfig()));
+      assertEventually(() -> 
Assert.assertNull(config.getSensorParserConfig("notthere")));
+    }
+    //profiler
+    {
+      File inFile = new File(profilerDir, "/readme-example-1/profiler.json");
+      ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, 
ProfilerConfig.class);
+      ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
+      assertEventually(() -> Assert.assertEquals(expectedConfig, 
config.getProfilerConfig()));
+      assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, 
config.getGlobalConfig()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 5f6f22f..308638e 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -123,7 +123,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
             
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
-    bulkMessageWriterBolt.setTreeCache(cache);
+    bulkMessageWriterBolt.setZKCache(cache);
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);
@@ -182,7 +182,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
             
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message").withBatchTimeoutDivisor(3);
     bulkMessageWriterBolt.setCuratorFramework(client);
-    bulkMessageWriterBolt.setTreeCache(cache);
+    bulkMessageWriterBolt.setZKCache(cache);
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);
@@ -226,7 +226,7 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
             
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
-    bulkMessageWriterBolt.setTreeCache(cache);
+    bulkMessageWriterBolt.setZKCache(cache);
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType
             , new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
index 77dd4cf..52135e3 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java
@@ -73,7 +73,7 @@ public class EnrichmentJoinBoltTest extends 
BaseEnrichmentBoltTest {
   public void test() throws IOException {
     EnrichmentJoinBolt enrichmentJoinBolt = new 
EnrichmentJoinBolt("zookeeperUrl");
     enrichmentJoinBolt.setCuratorFramework(client);
-    enrichmentJoinBolt.setTreeCache(cache);
+    enrichmentJoinBolt.setZKCache(cache);
     
enrichmentJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, 
new FileInputStream(sampleSensorEnrichmentConfigPath));
     enrichmentJoinBolt.withMaxCacheSize(100);
     enrichmentJoinBolt.withMaxTimeRetain(10000);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
index c79eb10..cbe7ed6 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java
@@ -58,7 +58,7 @@ public class EnrichmentSplitterBoltTest extends 
BaseEnrichmentBoltTest {
 
     EnrichmentSplitterBolt enrichmentSplitterBolt = new 
EnrichmentSplitterBolt("zookeeperUrl").withEnrichments(enrichments);
     enrichmentSplitterBolt.setCuratorFramework(client);
-    enrichmentSplitterBolt.setTreeCache(cache);
+    enrichmentSplitterBolt.setZKCache(cache);
     
enrichmentSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType,
 new FileInputStream(sampleSensorEnrichmentConfigPath));
     enrichmentSplitterBolt.prepare(new HashMap<>(), topologyContext, 
outputCollector);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
index 90322fe..d7b54dd 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java
@@ -161,7 +161,7 @@ public class GenericEnrichmentBoltTest extends 
BaseEnrichmentBoltTest {
       }
     };
     genericEnrichmentBolt.setCuratorFramework(client);
-    genericEnrichmentBolt.setTreeCache(cache);
+    genericEnrichmentBolt.setZKCache(cache);
     
genericEnrichmentBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType,
 new FileInputStream(sampleSensorEnrichmentConfigPath));
 
     HashMap<String, Object> globalConfig = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
index e03dc71..1bb1083 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java
@@ -99,7 +99,7 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest {
     }
     joinBolt = new StandAloneJoinBolt("zookeeperUrl");
     joinBolt.setCuratorFramework(client);
-    joinBolt.setTreeCache(cache);
+    joinBolt.setZKCache(cache);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
index 41d34de..57dae13 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java
@@ -88,7 +88,7 @@ public class SplitBoltTest extends BaseEnrichmentBoltTest {
   public void test() {
     StandAloneSplitBolt splitBolt = spy(new 
StandAloneSplitBolt("zookeeperUrl"));
     splitBolt.setCuratorFramework(client);
-    splitBolt.setTreeCache(cache);
+    splitBolt.setZKCache(cache);
     doCallRealMethod().when(splitBolt).reloadCallback(anyString(), 
any(ConfigurationType.class));
     splitBolt.prepare(new HashMap(), topologyContext, outputCollector);
     splitBolt.declareOutputFields(declarer);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
index 04617ff..62b2570 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java
@@ -157,7 +157,7 @@ public class ThreatIntelJoinBoltTest extends 
BaseEnrichmentBoltTest {
 
     ThreatIntelJoinBolt threatIntelJoinBolt = new 
ThreatIntelJoinBolt("zookeeperUrl");
     threatIntelJoinBolt.setCuratorFramework(client);
-    threatIntelJoinBolt.setTreeCache(cache);
+    threatIntelJoinBolt.setZKCache(cache);
 
     SensorEnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(
             new FileInputStream(sampleSensorEnrichmentConfigPath), 
SensorEnrichmentConfig.class);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
index 4feba2e..f7869e5 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java
@@ -33,7 +33,7 @@ public class ThreatIntelSplitterBoltTest extends 
BaseEnrichmentBoltTest {
     String threatIntelType = "hbaseThreatIntel";
     ThreatIntelSplitterBolt threatIntelSplitterBolt = new 
ThreatIntelSplitterBolt("zookeeperUrl");
     threatIntelSplitterBolt.setCuratorFramework(client);
-    threatIntelSplitterBolt.setTreeCache(cache);
+    threatIntelSplitterBolt.setZKCache(cache);
     
threatIntelSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType,
 new FileInputStream(sampleSensorEnrichmentConfigPath));
     threatIntelSplitterBolt.prepare(new HashMap<>(), topologyContext, 
outputCollector);
     Map<String, Object> fieldMap = 
threatIntelSplitterBolt.getFieldMap(sensorType);

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
 
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
index e67d3c9..9577a43 100644
--- 
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
+++ 
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
@@ -31,6 +31,28 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class TestUtils {
+  public static long MAX_ASSERT_WAIT_MS = 30000L;
+  public interface Assertion {
+    void apply() throws Exception;
+  }
+  public static void assertEventually(Assertion assertion) throws Exception {
+    assertEventually(assertion, MAX_ASSERT_WAIT_MS);
+  }
+  private static void assertEventually(Assertion assertion
+                             , long msToWait
+                             ) throws Exception {
+    long delta = msToWait/10;
+    for(int i = 0;i < 10;++i) {
+      try{
+        assertion.apply();
+        return;
+      }
+      catch(AssertionError t) {
+      }
+      Thread.sleep(delta);
+    }
+    assertion.apply();
+  }
 
   public static List<byte[]> readSampleData(String samplePath) throws 
IOException {
     BufferedReader br = new BufferedReader(new FileReader(samplePath));

Reply via email to