METRON-1594: KafkaWriter is asynchronous and may lose data on node failure 
(mmiklavc via mmiklavc) closes apache/metron#1045


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/523c38cf
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/523c38cf
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/523c38cf

Branch: refs/heads/master
Commit: 523c38cf6399e2e3974a51a2cd0fe47e096b0bdf
Parents: b6808f7
Author: mmiklavc <michael.miklav...@gmail.com>
Authored: Wed Jun 6 15:40:55 2018 -0600
Committer: Michael Miklavcic <michael.miklav...@gmail.com>
Committed: Wed Jun 6 15:40:55 2018 -0600

----------------------------------------------------------------------
 metron-analytics/metron-profiler/README.md      |  15 ++
 .../src/main/flux/profiler/remote.yaml          |   3 +-
 metron-platform/metron-common/README.md         |   5 +
 .../metron/common/bolt/ConfiguredBolt.java      |  14 +-
 .../common/bolt/ConfiguredEnrichmentBolt.java   |  11 +-
 .../common/bolt/ConfiguredIndexingBolt.java     |  13 +-
 .../common/bolt/ConfiguredParserBolt.java       |  13 +-
 .../common/bolt/ConfiguredProfilerBolt.java     |   9 +-
 .../common/configuration/Configurations.java    |   7 +-
 .../configuration/EnrichmentConfigurations.java |  26 +++
 .../configuration/IndexingConfigurations.java   |  12 +-
 .../configuration/ParserConfigurations.java     |   1 +
 .../profiler/ProfilerConfigurations.java        |  25 +++
 .../writer/ConfigurationStrategy.java           |  44 +++++
 .../writer/ConfigurationsStrategies.java        | 144 +++++++++++++++
 .../writer/EnrichmentWriterConfiguration.java   | 110 ++++++++++++
 .../writer/ParserWriterConfiguration.java       |   6 +-
 .../writer/ProfilerWriterConfiguration.java     | 109 ++++++++++++
 .../configuration/ParserConfigurationsTest.java | 120 +++++++++++++
 .../profiler/ProfilerConfigTest.java            |  13 +-
 .../writer/ConfigurationsStrategiesTest.java    |  79 +++++++++
 .../EnrichmentWriterConfigurationTest.java      |  54 ++++++
 .../writer/IndexingWriterConfigurationTest.java |  70 ++++++++
 .../writer/ParserWriterConfigurationTest.java   |  72 ++++++++
 .../writer/ProfilerWriterConfigurationTest.java |  54 ++++++
 .../writer/IndexingWriterConfigurationTest.java |  70 --------
 .../writer/ParserWriterConfigurationTest.java   |  38 ----
 metron-platform/metron-enrichment/README.md     |  15 +-
 .../main/flux/enrichment/remote-splitjoin.yaml  |   9 +-
 .../main/flux/enrichment/remote-unified.yaml    |   9 +-
 .../bolt/BulkMessageWriterBoltTest.java         |  64 ++++---
 .../src/main/flux/indexing/batch/remote.yaml    |   4 +-
 .../flux/indexing/random_access/remote.yaml     |   4 +-
 metron-platform/metron-parsers/README.md        |   6 +-
 .../apache/metron/parsers/bolt/ParserBolt.java  | 108 +++++++++++-
 .../metron/parsers/bolt/WriterHandler.java      |  63 +++++--
 .../metron/parsers/bolt/ParserBoltTest.java     | 176 +++++++++++++------
 .../integration/WriterBoltIntegrationTest.java  |   6 +-
 .../metron/writer/bolt/BatchTimeoutHelper.java  |   6 +-
 .../writer/bolt/BulkMessageWriterBolt.java      |  48 ++---
 .../apache/metron/writer/kafka/KafkaWriter.java |  92 +++++++---
 41 files changed, 1426 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md 
b/metron-analytics/metron-profiler/README.md
index 79cdd44..1a17e10 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -538,6 +538,8 @@ The Profiler runs as an independent Storm topology.  The 
configuration for the P
 | [`profiler.hbase.batch`](#profilerhbasebatch)                                
 | The number of puts that are written to HBase in a single batch.
 | 
[`profiler.hbase.flush.interval.seconds`](#profilerhbaseflushintervalseconds) | 
The maximum number of seconds between batch writes to HBase.
 | [`topology.kryo.register`](#topologykryoregister)                            
 | Storm will use Kryo serialization for these classes.
+| [`profiler.writer.batchSize`](#profilerwriterbatchsize)                      
 | The number of records to batch when writing to Kakfa.
+| [`profiler.writer.batchTimeout`](#profilerwriterbatchtimeout)                
 | The timeout in ms for batching when writing to Kakfa.
 
 
 ### `profiler.input.topic`
@@ -852,6 +854,19 @@ More information on accessing profile data can be found in 
the [Profiler Client]
 
 More information on using the [`STATS_*` functions in Stellar can be found 
here](../../metron-platform/metron-common).
 
+### `profiler.writer.batchSize`
+
+*Default*: 15
+
+The number of records to batch when writing to Kakfa. This is managed in the 
global configuration and does not require a topology restart.
+
+### `profiler.writer.batchTimeout`
+
+*Default*: 0
+
+The timeout after which a batch will be flushed even if batchSize has not been 
met.  Optional. If unspecified, or set to `0`, it defaults to a 
system-determined duration which is a fraction of the Storm parameter 
`topology.message.timeout.secs`.
+Ignored if batchSize is `1`, since this disables batching. This is managed in 
the global configuration and does not require a topology restart.
+
 ## Implementation
 
 ## Key Classes

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml 
b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 5e92c62..2f40554 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -183,8 +183,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "PROFILER"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args: [ref: "kafkaWriter"]
 
 streams:

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md 
b/metron-platform/metron-common/README.md
index c741e72..dae2e22 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -92,9 +92,14 @@ but a convenient index is provided here:
 | 
[`stellar.function.resolver.excludes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes)
 | Stellar       | CSV String | N/A                        |
 | 
[`profiler.period.duration`](../../metron-analytics/metron-profiler#profilerperiodduration)
                         | Profiler      | Integer    | 
`profiler_period_duration` |
 | 
[`profiler.period.duration.units`](../../metron-analytics/metron-profiler#profilerperioddurationunits)
              | Profiler      | String     | `profiler_period_units`    |
+| 
[`profiler.writer.batchSize`](../../metron-analytics/metron-profiler/#profilerwriterbatchsize)
                      | Profiler      | Integer    |  N/A                       
|
+| 
[`profiler.writer.batchTimeout`](../../metron-analytics/metron-profiler/#profilerwriterbatchtimeout)
                | Profiler      | Integer    |  N/A                       |
 | [`update.hbase.table`](../metron-indexing#updatehbasetable)                  
                                       | REST/Indexing | String     | 
`update_hbase_table`       |
 | [`update.hbase.cf`](../metron-indexing#updatehbasecf)                        
                                       | REST/Indexing | String     | 
`update_hbase_cf`          |
 | [`geo.hdfs.file`](../metron-enrichment#geohdfsfile)                          
                                       | Enrichment    | String     | 
`geo_hdfs_file`            |
+| 
[`enrichment.writer.batchSize`](../metron-enrichment#enrichmentwriterbatchsize) 
                                    | Enrichment    | Integer    |  N/A         
              |
+| 
[`enrichment.writer.batchTimeout`](../metron-enrichment#enrichmentwriterbatchtimeout)
                               | Enrichment    | Integer    |  N/A              
         |
+| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile)                          
                                       | Enrichment    | String     | 
`geo_hdfs_file`            |
 | [`source.type.field`](../../metron-interface/metron-alerts#sourcetypefield)  
                                       | UI            | String     |  N/A      
                 |
 
 ## Note Configs in Ambari

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index 6f15746..ef3b2bf 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -28,6 +28,8 @@ import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.writer.ConfigurationStrategy;
+import org.apache.metron.common.configuration.writer.ConfigurationsStrategies;
 import org.apache.metron.zookeeper.SimpleEventListener;
 import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
 import org.apache.metron.common.zookeeper.configurations.Reloadable;
@@ -43,12 +45,14 @@ public abstract class ConfiguredBolt<CONFIG_T extends 
Configurations> extends Ba
   private static final Logger LOG =  
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private String zookeeperUrl;
+  private String configurationStrategy;
 
   protected CuratorFramework client;
   protected ZKCache cache;
   private final CONFIG_T configurations;
-  public ConfiguredBolt(String zookeeperUrl) {
+  public ConfiguredBolt(String zookeeperUrl, String configurationStrategy) {
     this.zookeeperUrl = zookeeperUrl;
+    this.configurationStrategy = configurationStrategy;
     this.configurations = createUpdater().defaultConfigurations();
   }
 
@@ -68,7 +72,13 @@ public abstract class ConfiguredBolt<CONFIG_T extends 
Configurations> extends Ba
     return configurations;
   }
 
-  protected abstract ConfigurationsUpdater<CONFIG_T> createUpdater();
+  protected ConfigurationStrategy<CONFIG_T> getConfigurationStrategy() {
+    return ConfigurationsStrategies.valueOf(configurationStrategy);
+  }
+
+  protected ConfigurationsUpdater<CONFIG_T> createUpdater() {
+    return getConfigurationStrategy().createUpdater(this, 
this::getConfigurations);
+  }
 
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
index 54fd7e8..c28ca7b 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -17,13 +17,8 @@
  */
 package org.apache.metron.common.bolt;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,11 +28,7 @@ public abstract class ConfiguredEnrichmentBolt extends 
ConfiguredBolt<Enrichment
 
 
   public ConfiguredEnrichmentBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
+    super(zookeeperUrl, "ENRICHMENT");
   }
 
-  @Override
-  protected ConfigurationsUpdater<EnrichmentConfigurations> createUpdater() {
-    return new EnrichmentUpdater(this, this::getConfigurations);
-  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
index 09300e4..27e081e 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
@@ -17,26 +17,17 @@
  */
 package org.apache.metron.common.bolt;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.IndexingConfigurations;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.common.zookeeper.configurations.IndexingUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+// TODO delete - no longer used after removing from BulkMessageWriterBolt?
 public abstract class ConfiguredIndexingBolt extends 
ConfiguredBolt<IndexingConfigurations> {
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public ConfiguredIndexingBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
-  }
-
-  @Override
-  protected ConfigurationsUpdater<IndexingConfigurations> createUpdater() {
-    return new IndexingUpdater(this, this::getConfigurations);
+    super(zookeeperUrl, "INDEXING");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 2f13658..1cb4e2e 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -17,14 +17,9 @@
  */
 package org.apache.metron.common.bolt;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.common.zookeeper.configurations.ParserUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +30,7 @@ public abstract class ConfiguredParserBolt extends 
ConfiguredBolt<ParserConfigur
   protected final ParserConfigurations configurations = new 
ParserConfigurations();
   private String sensorType;
   public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
-    super(zookeeperUrl);
+    super(zookeeperUrl, "PARSERS");
     this.sensorType = sensorType;
   }
 
@@ -47,10 +42,4 @@ public abstract class ConfiguredParserBolt extends 
ConfiguredBolt<ParserConfigur
     return sensorType;
   }
 
-
-  @Override
-  protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-    return new ParserUpdater(this, this::getConfigurations);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
index 90575d0..e4d9f7b 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
@@ -21,8 +21,6 @@ package org.apache.metron.common.bolt;
 import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
-import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
-import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,16 +32,11 @@ public abstract class ConfiguredProfilerBolt extends 
ConfiguredBolt<ProfilerConf
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public ConfiguredProfilerBolt(String zookeeperUrl) {
-    super(zookeeperUrl);
+    super(zookeeperUrl, "PROFILER");
   }
 
   protected ProfilerConfig getProfilerConfig() {
     return getConfigurations().getProfilerConfig();
   }
 
-  @Override
-  protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() {
-    return new ProfilerUpdater(this, this::getConfigurations);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
index af421a9..62cf742 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java
@@ -27,8 +27,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,6 +75,11 @@ public class Configurations implements Serializable {
     getConfigurations().remove(ConfigurationType.GLOBAL.getTypeName());
   }
 
+  public static <T> T getAs(String key, Map<String, Object> map, T 
defaultValue, Class<T> clazz) {
+    return map == null ? defaultValue
+        : ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
index dfd7a65..1c723c7 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java
@@ -27,6 +27,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class EnrichmentConfigurations extends Configurations {
+  public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;
+  public static final String BATCH_SIZE_CONF = "enrichment.writer.batchSize";
+  public static final String BATCH_TIMEOUT_CONF = 
"enrichment.writer.batchTimeout";
 
   public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
     return (SensorEnrichmentConfig) 
getConfigurations().get(getKey(sensorType));
@@ -49,6 +52,28 @@ public class EnrichmentConfigurations extends Configurations 
{
     getConfigurations().remove(getKey(sensorType));
   }
 
+  /**
+   * Pulled from global config.
+   * Note: enrichment writes out to 1 kafka topic, so it is not pulling this 
config by sensor.
+   *
+   * @return batch size for writing to kafka
+   * @see 
org.apache.metron.common.configuration.EnrichmentConfigurations#BATCH_SIZE_CONF
+   */
+  public int getBatchSize() {
+    return getAs(BATCH_SIZE_CONF, getGlobalConfig(true), 
DEFAULT_KAFKA_BATCH_SIZE, Integer.class);
+  }
+
+  /**
+   * Pulled from global config
+   * Note: enrichment writes out to 1 kafka topic, so it is not pulling this 
config by sensor.
+   *
+   * @return batch timeout for writing to kafka
+   * @see 
org.apache.metron.common.configuration.EnrichmentConfigurations#BATCH_TIMEOUT_CONF
+   */
+  public int getBatchTimeout() {
+    return getAs(BATCH_TIMEOUT_CONF, getGlobalConfig(true), 0, Integer.class);
+  }
+
   public List<String> getTypes() {
     List<String> ret = new ArrayList<>();
     for(String keyedSensor : getConfigurations().keySet()) {
@@ -62,4 +87,5 @@ public class EnrichmentConfigurations extends Configurations {
   public static String getKey(String sensorType) {
     return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index 5b67aa5..7ecb606 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -17,13 +17,14 @@
  */
 package org.apache.metron.common.configuration;
 
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.JSONUtils;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.common.utils.JSONUtils;
 
 public class IndexingConfigurations extends Configurations {
   public static final String BATCH_SIZE_CONF = "batchSize";
@@ -226,7 +227,4 @@ public class IndexingConfigurations extends Configurations {
     return ret;
   }
 
-  public static <T> T getAs(String key, Map<String, Object> map, T 
defaultValue, Class<T> clazz) {
-    return map == null?defaultValue: 
ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
-  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
index 72af833..83daf0d 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class ParserConfigurations extends Configurations {
+  public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;
 
   public SensorParserConfig getSensorParserConfig(String sensorType) {
     return (SensorParserConfig) getConfigurations().get(getKey(sensorType));

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
index f50d770..1348068 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
@@ -29,6 +29,9 @@ import java.io.InputStream;
  * Used to manage configurations for the Profiler.
  */
 public class ProfilerConfigurations extends Configurations {
+  public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;
+  public static final String BATCH_SIZE_CONF = "profiler.writer.batchSize";
+  public static final String BATCH_TIMEOUT_CONF = 
"profiler.writer.batchTimeout";
 
   public ProfilerConfig getProfilerConfig() {
     return (ProfilerConfig) getConfigurations().get(getKey());
@@ -55,4 +58,26 @@ public class ProfilerConfigurations extends Configurations {
     configurations.remove(getKey());
   }
 
+  /**
+   * Pulled from global config.
+   * Note: profiler writes out to 1 kafka topic, so it is not pulling this 
config by sensor.
+   *
+   * @return batch size for writing to kafka
+   * @see 
org.apache.metron.common.configuration.profiler.ProfilerConfigurations#BATCH_SIZE_CONF
+   */
+  public int getBatchSize() {
+    return getAs(BATCH_SIZE_CONF, getGlobalConfig(true), 
DEFAULT_KAFKA_BATCH_SIZE, Integer.class);
+  }
+
+  /**
+   * Pulled from global config
+   * Note: profiler writes out to 1 kafka topic, so it is not pulling this 
config by sensor.
+   *
+   * @return batch timeout for writing to kafka
+   * @see 
org.apache.metron.common.configuration.profiler.ProfilerConfigurations#BATCH_TIMEOUT_CONF
+   */
+  public int getBatchTimeout() {
+    return getAs(BATCH_TIMEOUT_CONF, getGlobalConfig(true), 0, Integer.class);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java
new file mode 100644
index 0000000..cd13dc7
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationStrategy.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration.writer;
+
+import java.util.function.Supplier;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
+
+public interface ConfigurationStrategy<T extends Configurations> {
+
+  /**
+   * Create a specific writer configuration
+   * @param writer provided for the underlying creator to access metadata as 
needed
+   * @param configs a WriterConfiguration will typically access the pass 
configs
+   * @return
+   */
+  WriterConfiguration createWriterConfig(BulkMessageWriter writer, 
Configurations configs);
+
+  /**
+   * Create specific config updater for the type of config extending 
Configurations
+   * @param reloadable setup as a callback by the updater
+   * @param configSupplier supplies config to the updater
+   * @return updater
+   */
+  ConfigurationsUpdater<T> createUpdater(Reloadable reloadable, Supplier<T> 
configSupplier);
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java
new file mode 100644
index 0000000..f22dd01
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategies.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import java.util.function.Supplier;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater;
+import org.apache.metron.common.zookeeper.configurations.IndexingUpdater;
+import org.apache.metron.common.zookeeper.configurations.ParserUpdater;
+import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
+
+/**
+ * Strategy pattern implementation that couples factories for 
WriterConfiguration and
+ * ConfigurationsUpdater together for a particular type.
+ * <br>
+ * <strong>Note:</strong> The enum type definition does not specify generics 
because
+ * enums are disallowed from doing this in Java.
+ */
+public enum ConfigurationsStrategies implements ConfigurationStrategy {
+
+  PARSERS(new ConfigurationStrategy<ParserConfigurations>() {
+
+    @Override
+    public WriterConfiguration createWriterConfig(BulkMessageWriter writer,
+        Configurations configs) {
+      if (configs instanceof ParserConfigurations) {
+        return new ParserWriterConfiguration((ParserConfigurations) configs);
+      } else {
+        throw new IllegalArgumentException(
+            "Expected config of type ParserConfigurations but found " + 
configs.getClass());
+      }
+    }
+
+    @Override
+    public ConfigurationsUpdater<ParserConfigurations> 
createUpdater(Reloadable reloadable,
+        Supplier configSupplier) {
+      return new ParserUpdater(reloadable, configSupplier);
+    }
+  }),
+
+  ENRICHMENT(new ConfigurationStrategy() {
+
+    @Override
+    public WriterConfiguration createWriterConfig(BulkMessageWriter writer,
+        Configurations configs) {
+      if (configs instanceof EnrichmentConfigurations) {
+        return new EnrichmentWriterConfiguration((EnrichmentConfigurations) 
configs);
+      } else {
+        throw new IllegalArgumentException(
+            "Expected config of type EnrichmentConfigurations but found " + 
configs.getClass());
+      }
+    }
+
+    @Override
+    public ConfigurationsUpdater<EnrichmentConfigurations> 
createUpdater(Reloadable reloadable,
+        Supplier configSupplier) {
+      return new EnrichmentUpdater(reloadable, configSupplier);
+    }
+  }),
+
+  INDEXING(new ConfigurationStrategy() {
+
+    @Override
+    public WriterConfiguration createWriterConfig(BulkMessageWriter writer,
+        Configurations configs) {
+      if (configs instanceof IndexingConfigurations) {
+        return new IndexingWriterConfiguration(writer.getName(), 
(IndexingConfigurations) configs);
+      } else {
+        throw new IllegalArgumentException(
+            "Expected config of type IndexingConfigurations but found " + 
configs.getClass());
+      }
+    }
+
+    @Override
+    public ConfigurationsUpdater<IndexingConfigurations> 
createUpdater(Reloadable reloadable,
+        Supplier configSupplier) {
+      return new IndexingUpdater(reloadable, configSupplier);
+    }
+  }),
+
+  PROFILER(new ConfigurationStrategy() {
+
+    @Override
+    public WriterConfiguration createWriterConfig(BulkMessageWriter writer,
+        Configurations configs) {
+        if (configs instanceof ProfilerConfigurations) {
+          return new ProfilerWriterConfiguration((ProfilerConfigurations) 
configs);
+        } else {
+          throw new IllegalArgumentException(
+              "Expected config of type IndexingConfigurations but found " + 
configs.getClass());
+        }
+    }
+
+    @Override
+    public ConfigurationsUpdater createUpdater(Reloadable reloadable, Supplier 
configSupplier) {
+      return new ProfilerUpdater(reloadable, configSupplier);
+    }
+  });
+
+  private ConfigurationStrategy<? extends Configurations> strategy;
+
+  ConfigurationsStrategies(ConfigurationStrategy<? extends Configurations> 
strategy) {
+    this.strategy = strategy;
+  }
+
+  @Override
+  public WriterConfiguration createWriterConfig(BulkMessageWriter writer, 
Configurations configs) {
+    return strategy.createWriterConfig(writer, configs);
+  }
+
+  /**
+   * Config updater
+   * @param reloadable callback
+   * @param configSupplier Supplier provides config of type <? extends 
Configurations>
+   * @return Config updater
+   */
+  @Override
+  public ConfigurationsUpdater createUpdater(Reloadable reloadable, Supplier 
configSupplier) {
+    return strategy.createUpdater(reloadable, configSupplier);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
new file mode 100644
index 0000000..c275a76
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import static java.util.Arrays.asList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+
+/**
+ * Writer configuration for the enrichment topology. Batch size and batch 
timeout are a couple
+ * primary values of interest that are used for configuring the enrichment 
writer.
+ */
+public class EnrichmentWriterConfiguration implements WriterConfiguration {
+
+  private Optional<EnrichmentConfigurations> config;
+
+  public EnrichmentWriterConfiguration(EnrichmentConfigurations config) {
+    this.config = Optional.ofNullable(config);
+  }
+
+  /**
+   * Batch size for writing.
+   * @param sensorName n/a
+   * @return batch size in # messages
+   */
+  @Override
+  public int getBatchSize(String sensorName) {
+    return config.orElse(new EnrichmentConfigurations()).getBatchSize();
+  }
+
+  /**
+   * Timeout for this writer.
+   * @param sensorName n/a
+   * @return timeout in ms
+   */
+  @Override
+  public int getBatchTimeout(String sensorName) {
+    return config.orElse(new EnrichmentConfigurations()).getBatchTimeout();
+  }
+
+  /**
+   * Timeout for this writer.
+   * @return single item list with this writer's timeout
+   */
+  @Override
+  public List<Integer> getAllConfiguredTimeouts() {
+    return asList(getBatchTimeout(null));
+  }
+
+  /**
+   * n/a for enrichment
+   * @param sensorName n/a
+   * @return null
+   */
+  @Override
+  public String getIndex(String sensorName) {
+    return null;
+  }
+
+  /**
+   * Always enabled in enrichment
+   * @param sensorName n/a
+   * @return true
+   */
+  @Override
+  public boolean isEnabled(String sensorName) {
+    return true;
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.orElse(new 
EnrichmentConfigurations()).getSensorEnrichmentConfig(sensorName)
+        .getConfiguration();
+  }
+
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.orElse(new EnrichmentConfigurations()).getGlobalConfig();
+  }
+
+  @Override
+  public boolean isDefault(String sensorName) {
+    return false;
+  }
+
+  @Override
+  public String getFieldNameConverter(String sensorName) {
+    // not applicable
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
index 4603b32..a4f3f66 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
@@ -26,6 +26,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+/*
+ * Note that parsers can also be used for streaming enrichments, which means 
broader scope than
+ * Kafka alone.
+ */
 public class ParserWriterConfiguration implements WriterConfiguration {
   private ParserConfigurations config;
   public ParserWriterConfiguration(ParserConfigurations config) {
@@ -38,7 +42,7 @@ public class ParserWriterConfiguration implements 
WriterConfiguration {
             && config.getSensorParserConfig(sensorName).getParserConfig() != 
null
             ) {
       Object batchObj = 
config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_SIZE_CONF);
-      return batchObj == null ? 1 : ConversionUtils.convert(batchObj, 
Integer.class);
+      return batchObj == null ? ParserConfigurations.DEFAULT_KAFKA_BATCH_SIZE 
: ConversionUtils.convert(batchObj, Integer.class);
     }
     return 1;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java
new file mode 100644
index 0000000..fe283bd
--- /dev/null
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfiguration.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import static java.util.Arrays.asList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+
+/**
+ * Writer configuration for the profiler topology. Batch size and batch 
timeout are a couple
+ * primary values of interest that are used for configuring the profiler 
writer.
+ */
+public class ProfilerWriterConfiguration implements WriterConfiguration {
+  private Optional<ProfilerConfigurations> config;
+
+  public ProfilerWriterConfiguration(ProfilerConfigurations config) {
+    this.config = Optional.ofNullable(config);
+  }
+
+  /**
+   * Batch size for writing.
+   * @param sensorName n/a
+   * @return batch size in # messages
+   */
+  @Override
+  public int getBatchSize(String sensorName) {
+    return config.orElse(new ProfilerConfigurations()).getBatchSize();
+  }
+
+  /**
+   * Timeout for this writer.
+   * @param sensorName n/a
+   * @return timeout in ms
+   */
+  @Override
+  public int getBatchTimeout(String sensorName) {
+    return config.orElse(new ProfilerConfigurations()).getBatchTimeout();
+  }
+
+  /**
+   * Timeout for this writer.
+   * @return single item list with this writer's timeout
+   */
+  @Override
+  public List<Integer> getAllConfiguredTimeouts() {
+    return asList(getBatchTimeout(null));
+  }
+
+  /**
+   * n/a for profiler
+   * @param sensorName n/a
+   * @return null
+   */
+  @Override
+  public String getIndex(String sensorName) {
+    return null;
+  }
+
+  /**
+   * Always enabled in profiler
+   * @param sensorName n/a
+   * @return true
+   */
+  @Override
+  public boolean isEnabled(String sensorName) {
+    return true;
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    throw new UnsupportedOperationException("Profiler does not have sensor 
configs");
+  }
+
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.orElse(new ProfilerConfigurations()).getGlobalConfig();
+  }
+
+  @Override
+  public boolean isDefault(String sensorName) {
+    return false;
+  }
+
+  @Override
+  public String getFieldNameConverter(String sensorName) {
+    // not applicable
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java
new file mode 100644
index 0000000..c099aec
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ParserConfigurationsTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.junit.Test;
+
+public class ParserConfigurationsTest {
+
+  /**
+   * {
+   *  "parserClassName" : "parser-class",
+   *  "filterClassName" : "filter-class",
+   *  "sensorTopic" : "sensor-topic",
+   *  "outputTopic" : "output-topic",
+   *  "errorTopic" : "error-topic",
+   *  "writerClassName" : "writer-class",
+   *  "errorWriterClassName" : "error-writer-class",
+   *  "readMetadata" : true,
+   *  "mergeMetadata" : true,
+   *  "numWorkers" : 40,
+   *  "numAckers" : 40,
+   *  "spoutParallelism" : 40,
+   *  "spoutNumTasks" : 40,
+   *  "parserParallelism" : 40,
+   *  "parserNumTasks" : 40,
+   *  "errorWriterParallelism" : 40,
+   *  "errorWriterNumTasks" : 40,
+   *  "securityProtocol" : "security-protocol",
+   *  "spoutConfig" : {
+   *    "foo" : "bar"
+   *  },
+   *  "stormConfig" : {
+   *    "storm" : "config"
+   *  },
+   *  "cacheConfig" : {
+   *    "stellar.cache.maxSize" : 20000
+   *  },
+   *  "parserConfig" : {
+   *    "parser" : "config"
+   *  },
+   *  "fieldTransformations" : [
+   *    {
+   *      "input" : "input-field",
+   *      "transformation" : "REMOVE"
+   *    }
+   *  ]
+   * }
+   */
+  @Multiline
+  private static String parserConfig;
+
+  @Test
+  public void sensorParserConfig_properties_populated_by_JSON_configuration() 
throws IOException {
+    ParserConfigurations parserConfigs = new ParserConfigurations();
+    parserConfigs.updateSensorParserConfig("test-sensor", 
parserConfig.getBytes());
+    SensorParserConfig actualSensorConfig = 
parserConfigs.getSensorParserConfig("test-sensor");
+    assertThat(actualSensorConfig.getParserClassName(), 
equalTo("parser-class"));
+    assertThat(actualSensorConfig.getFilterClassName(), 
equalTo("filter-class"));
+    assertThat(actualSensorConfig.getSensorTopic(), equalTo("sensor-topic"));
+    assertThat(actualSensorConfig.getOutputTopic(), equalTo("output-topic"));
+    assertThat(actualSensorConfig.getErrorTopic(), equalTo("error-topic"));
+    assertThat(actualSensorConfig.getWriterClassName(), 
equalTo("writer-class"));
+    assertThat(actualSensorConfig.getErrorWriterClassName(), 
equalTo("error-writer-class"));
+    assertThat(actualSensorConfig.getReadMetadata(), equalTo(true));
+    assertThat(actualSensorConfig.getMergeMetadata(), equalTo(true));
+    assertThat(actualSensorConfig.getNumWorkers(), equalTo(40));
+    assertThat(actualSensorConfig.getNumAckers(), equalTo(40));
+    assertThat(actualSensorConfig.getSpoutParallelism(), equalTo(40));
+    assertThat(actualSensorConfig.getSpoutNumTasks(), equalTo(40));
+    assertThat(actualSensorConfig.getParserParallelism(), equalTo(40));
+    assertThat(actualSensorConfig.getParserNumTasks(), equalTo(40));
+    assertThat(actualSensorConfig.getErrorWriterParallelism(), equalTo(40));
+    assertThat(actualSensorConfig.getErrorWriterNumTasks(), equalTo(40));
+    assertThat(actualSensorConfig.getSecurityProtocol(), 
equalTo("security-protocol"));
+    assertThat(actualSensorConfig.getSpoutConfig(), not(new HashMap<>()));
+    assertThat(actualSensorConfig.getSpoutConfig().get("foo"), equalTo("bar"));
+    assertThat(actualSensorConfig.getStormConfig(), not(new HashMap<>()));
+    assertThat(actualSensorConfig.getStormConfig().get("storm"), 
equalTo("config"));
+    assertThat(actualSensorConfig.getCacheConfig(), not(new HashMap<>()));
+    
assertThat(actualSensorConfig.getCacheConfig().get("stellar.cache.maxSize"), 
equalTo(20000));
+    assertThat(actualSensorConfig.getParserConfig(), not(new HashMap<>()));
+    assertThat(actualSensorConfig.getParserConfig().get("parser"), 
equalTo("config"));
+    assertThat(actualSensorConfig.getFieldTransformations(), not(new 
ArrayList<>()));
+    assertThat(actualSensorConfig.getFieldTransformations().get(0), 
not(nullValue()));
+    assertThat(
+        ((FieldTransformer) 
actualSensorConfig.getFieldTransformations().get(0)).getInput().size(),
+        equalTo(1));
+    assertThat(
+        ((FieldTransformer) 
actualSensorConfig.getFieldTransformations().get(0)).getInput().get(0),
+        equalTo("input-field"));
+    assertThat(((FieldTransformer) 
actualSensorConfig.getFieldTransformations().get(0))
+        .getTransformation(), equalTo("REMOVE"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
index 2cbdfb9..4c70a8c 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
@@ -19,19 +19,18 @@
  */
 package org.apache.metron.common.configuration.profiler;
 
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.utils.SerDeUtils;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.junit.Test;
 
 /**
  * Tests the {@link ProfilerConfig} class.

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java
new file mode 100644
index 0000000..fd498ba
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ConfigurationsStrategiesTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import static 
org.apache.metron.common.configuration.writer.ConfigurationsStrategies.ENRICHMENT;
+import static 
org.apache.metron.common.configuration.writer.ConfigurationsStrategies.INDEXING;
+import static 
org.apache.metron.common.configuration.writer.ConfigurationsStrategies.PARSERS;
+import static 
org.apache.metron.common.configuration.writer.ConfigurationsStrategies.PROFILER;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater;
+import org.apache.metron.common.zookeeper.configurations.IndexingUpdater;
+import org.apache.metron.common.zookeeper.configurations.ParserUpdater;
+import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class ConfigurationsStrategiesTest {
+
+  @Mock
+  private BulkMessageWriter writer;
+  @Mock
+  private Reloadable reloadable;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void strategies_build_writer_configs() {
+    assertThat(PARSERS.createWriterConfig(writer, new ParserConfigurations()),
+        instanceOf(ParserWriterConfiguration.class));
+    assertThat(ENRICHMENT.createWriterConfig(writer, new 
EnrichmentConfigurations()),
+        instanceOf(EnrichmentWriterConfiguration.class));
+    assertThat(INDEXING.createWriterConfig(writer, new 
IndexingConfigurations()),
+        instanceOf(IndexingWriterConfiguration.class));
+    assertThat(PROFILER.createWriterConfig(writer, new 
ProfilerConfigurations()),
+        instanceOf(ProfilerWriterConfiguration.class));
+  }
+
+  @Test
+  public void strategies_build_updaters() {
+    assertThat(PARSERS.createUpdater(reloadable, ParserConfigurations::new),
+        instanceOf(ParserUpdater.class));
+    assertThat(ENRICHMENT.createUpdater(reloadable, 
EnrichmentConfigurations::new),
+        instanceOf(EnrichmentUpdater.class));
+    assertThat(INDEXING.createUpdater(reloadable, IndexingConfigurations::new),
+        instanceOf(IndexingUpdater.class));
+    assertThat(PROFILER.createUpdater(reloadable, ProfilerConfigurations::new),
+        instanceOf(ProfilerUpdater.class));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java
new file mode 100644
index 0000000..b6aae19
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfigurationTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.junit.Test;
+
+public class EnrichmentWriterConfigurationTest {
+
+  /**
+   * {
+   *  "enrichment.writer.batchSize" : 12345,
+   *  "enrichment.writer.batchTimeout" : 555
+   * }
+   */
+  @Multiline
+  private static String globalJson;
+
+  @Test
+  public void gets_batch_size_and_timeout_from_global_config() throws 
IOException {
+    EnrichmentConfigurations configs = new EnrichmentConfigurations();
+    configs.updateGlobalConfig(globalJson.getBytes());
+    EnrichmentWriterConfiguration writerConfig = new 
EnrichmentWriterConfiguration(configs);
+    assertThat("batch timeout should match global config setting",
+        writerConfig.getBatchTimeout(null), equalTo(555));
+    assertThat("list should have single batch timeout matching global config 
setting",
+        writerConfig.getAllConfiguredTimeouts(), equalTo(asList(555)));
+    assertThat("batch size should match global config setting", 
writerConfig.getBatchSize(null),
+        equalTo(12345));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java
new file mode 100644
index 0000000..b2b5eca
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/IndexingWriterConfigurationTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import static 
org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath;
+import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sensorType;
+
+public class IndexingWriterConfigurationTest {
+  @Test
+  public void testDefaultBatchSize() {
+    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
+           new IndexingConfigurations()
+    );
+    Assert.assertEquals(1, config.getBatchSize("foo"));
+  }
+  @Test
+  public void testDefaultBatchTimeout() {
+    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
+           new IndexingConfigurations()
+    );
+    Assert.assertEquals(0, config.getBatchTimeout("foo"));
+  }
+  @Test
+  public void testGetAllConfiguredTimeouts() throws FileNotFoundException, 
IOException {
+    //default
+    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
+            new IndexingConfigurations()
+    );
+    Assert.assertEquals(0, config.getAllConfiguredTimeouts().size());
+    //non-default
+    IndexingConfigurations iconfigs = new IndexingConfigurations();
+    iconfigs.updateSensorIndexingConfig(
+            sensorType, new FileInputStream(sampleSensorIndexingConfigPath));
+    config = new IndexingWriterConfiguration("elasticsearch", iconfigs);
+    Assert.assertEquals(1, config.getAllConfiguredTimeouts().size());
+    Assert.assertEquals(7, (long)config.getAllConfiguredTimeouts().get(0));
+  }
+  @Test
+  public void testDefaultIndex() {
+    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
+           new IndexingConfigurations()
+    );
+    Assert.assertEquals("foo", config.getIndex("foo"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java
new file mode 100644
index 0000000..ebe3d9b
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ParserWriterConfigurationTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ParserWriterConfigurationTest {
+
+  @Test
+  public void testDefaultBatchSize() {
+    ParserWriterConfiguration config = new ParserWriterConfiguration( new 
ParserConfigurations() );
+    Assert.assertEquals(1, config.getBatchSize("foo"));
+  }
+
+  @Test
+  public void testDefaultIndex() {
+    ParserWriterConfiguration config = new ParserWriterConfiguration( new 
ParserConfigurations() );
+    Assert.assertEquals("foo", config.getIndex("foo"));
+  }
+
+  /**
+   * {
+   *   "parserClassName":"some-parser",
+   *   "sensorTopic":"testtopic",
+   *   "parserConfig": {
+   *     "batchSize" : 5,
+   *     "batchTimeout" : "10000",
+   *     "index" : "modified-index",
+   *     "enabled" : "false"
+   *   }
+   * }
+   */
+  @Multiline
+  private static String configJson;
+
+  @Test
+  public void pulls_writer_configuration_from_parserConfig() throws 
IOException {
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    final String sensorName = "some-sensor";
+    parserConfigurations.updateSensorParserConfig("some-sensor", 
configJson.getBytes());
+    ParserWriterConfiguration writerConfiguration = new 
ParserWriterConfiguration(
+        parserConfigurations);
+    assertThat("batch size should match", 
writerConfiguration.getBatchSize(sensorName), equalTo(5));
+    assertThat("batch timeout should match", 
writerConfiguration.getBatchTimeout(sensorName), equalTo(10000));
+    assertThat("index should match", writerConfiguration.getIndex(sensorName), 
equalTo("modified-index"));
+    assertThat("enabled should match", 
writerConfiguration.isEnabled(sensorName), equalTo(false));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java
new file mode 100644
index 0000000..3639276
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/writer/ProfilerWriterConfigurationTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.junit.Test;
+
+public class ProfilerWriterConfigurationTest {
+
+  /**
+   * {
+   *  "profiler.writer.batchSize" : 12345,
+   *  "profiler.writer.batchTimeout" : 555
+   * }
+   */
+  @Multiline
+  private static String globalJson;
+
+  @Test
+  public void gets_batch_size_and_timeout_from_global_config() throws 
IOException {
+    ProfilerConfigurations configs = new ProfilerConfigurations();
+    configs.updateGlobalConfig(globalJson.getBytes());
+    ProfilerWriterConfiguration writerConfig = new 
ProfilerWriterConfiguration(configs);
+    assertThat("batch timeout should match global config setting",
+        writerConfig.getBatchTimeout(null), equalTo(555));
+    assertThat("list should have single batch timeout matching global config 
setting",
+        writerConfig.getAllConfiguredTimeouts(), equalTo(asList(555)));
+    assertThat("batch size should match global config setting", 
writerConfig.getBatchSize(null),
+        equalTo(12345));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
deleted file mode 100644
index aec215f..0000000
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.writer;
-
-import org.apache.metron.common.configuration.IndexingConfigurations;
-import 
org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import static 
org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sampleSensorIndexingConfigPath;
-import static org.apache.metron.test.bolt.BaseEnrichmentBoltTest.sensorType;
-
-public class IndexingWriterConfigurationTest {
-  @Test
-  public void testDefaultBatchSize() {
-    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
-           new IndexingConfigurations()
-    );
-    Assert.assertEquals(1, config.getBatchSize("foo"));
-  }
-  @Test
-  public void testDefaultBatchTimeout() {
-    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
-           new IndexingConfigurations()
-    );
-    Assert.assertEquals(0, config.getBatchTimeout("foo"));
-  }
-  @Test
-  public void testGetAllConfiguredTimeouts() throws FileNotFoundException, 
IOException {
-    //default
-    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
-            new IndexingConfigurations()
-    );
-    Assert.assertEquals(0, config.getAllConfiguredTimeouts().size());
-    //non-default
-    IndexingConfigurations iconfigs = new IndexingConfigurations();
-    iconfigs.updateSensorIndexingConfig(
-            sensorType, new FileInputStream(sampleSensorIndexingConfigPath));
-    config = new IndexingWriterConfiguration("elasticsearch", iconfigs);
-    Assert.assertEquals(1, config.getAllConfiguredTimeouts().size());
-    Assert.assertEquals(7, (long)config.getAllConfiguredTimeouts().get(0));
-  }
-  @Test
-  public void testDefaultIndex() {
-    IndexingWriterConfiguration config = new 
IndexingWriterConfiguration("hdfs",
-           new IndexingConfigurations()
-    );
-    Assert.assertEquals("foo", config.getIndex("foo"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java
deleted file mode 100644
index e960e47..0000000
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/ParserWriterConfigurationTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.writer;
-
-import org.apache.metron.common.configuration.ParserConfigurations;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ParserWriterConfigurationTest {
-  @Test
-  public void testDefaultBatchSize() {
-    ParserWriterConfiguration config = new ParserWriterConfiguration( new 
ParserConfigurations() );
-    Assert.assertEquals(1, config.getBatchSize("foo"));
-  }
-
-  @Test
-  public void testDefaultIndex() {
-    ParserWriterConfiguration config = new ParserWriterConfiguration( new 
ParserConfigurations() );
-    Assert.assertEquals("foo", config.getIndex("foo"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/README.md 
b/metron-platform/metron-enrichment/README.md
index cbf8ee8..8a53e71 100644
--- a/metron-platform/metron-enrichment/README.md
+++ b/metron-platform/metron-enrichment/README.md
@@ -89,7 +89,8 @@ There are two types of configurations at the moment, `global` 
and
 ## Global Configuration 
 
 There are a few enrichments which have independent configurations, such
-as from the global config.
+as from the global config. You can also configure the enrichment topology's
+writer batching settings.
 
 Also, see the "[Global Configuration](../metron-common)" section for
 more discussion of the global config.
@@ -107,6 +108,18 @@ the topology and used from there. This is lazy, so if this 
property
 changes in a running topology, the file will be localized from HDFS upon first
 time the file is used via the geo enrichment. 
 
+### Writer Batching
+
+#### `enrichment.writer.batchSize`
+
+The size of the batch that is written to Kafka at once. Defaults to `15` (size 
of 1 disables batching).
+
+#### `enrichment.writer.batchTimeout`
+
+The timeout after which a batch will be flushed even if batchSize has not been 
met.  Optional.
+If unspecified, or set to `0`, it defaults to a system-determined duration 
which is a fraction of the Storm 
+parameter `topology.message.timeout.secs`.  Ignored if batchSize is `1`, since 
this disables batching.
+
 ## Sensor Enrichment Configuration
 
 The sensor specific configuration is intended to configure the

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml
 
b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml
index fd7ceff..8a1bba1 100644
--- 
a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml
+++ 
b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml
@@ -328,8 +328,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "ENRICHMENT"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "enrichmentErrorKafkaWriter"
 
@@ -388,8 +389,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "ENRICHMENT"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "threatIntelErrorKafkaWriter"
 
@@ -398,8 +400,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "ENRICHMENT"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "kafkaWriter"
         parallelism: ${kafka.writer.parallelism}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
 
b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
index d7107d9..45b05cf 100644
--- 
a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
+++ 
b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote-unified.yaml
@@ -295,8 +295,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "ENRICHMENT"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "enrichmentErrorKafkaWriter"
 
@@ -329,8 +330,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "ENRICHMENT"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "threatIntelErrorKafkaWriter"
 
@@ -339,8 +341,9 @@ bolts:
         className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
+            - "ENRICHMENT"
         configMethods:
-            -   name: "withMessageWriter"
+            -   name: "withBulkMessageWriter"
                 args:
                     - ref: "kafkaWriter"
         parallelism: ${kafka.writer.parallelism}

http://git-wip-us.apache.org/repos/asf/metron/blob/523c38cf/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 52516ac..588fc58 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -17,9 +17,27 @@
  */
 package org.apache.metron.enrichment.bolt;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.log4j.Level;
 import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
@@ -43,19 +61,6 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Matchers;
 import org.mockito.Mock;
 
-import java.io.FileInputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-
 public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
 
   protected class MessageListMatcher extends ArgumentMatcher<List<JSONObject>> 
{
@@ -119,9 +124,9 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
 
   @Test
   public void testSourceTypeMissing() throws Exception {
-
     // setup the bolt
-    BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
+    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new 
BulkMessageWriterBolt<IndexingConfigurations>(
+          "zookeeperUrl", "INDEXING")
             .withBulkMessageWriter(bulkMessageWriter)
             .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
             .withMessageGetterField("message");
@@ -148,9 +153,11 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
 
   @Test
   public void testFlushOnBatchSize() throws Exception {
-    BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
-            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
-            .withMessageGetterField("message");
+    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new 
BulkMessageWriterBolt<IndexingConfigurations>(
+        "zookeeperUrl", "INDEXING")
+        .withBulkMessageWriter(bulkMessageWriter)
+        .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+        .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setZKCache(cache);
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
@@ -207,9 +214,12 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
   @Test
   public void testFlushOnBatchTimeout() throws Exception {
     FakeClock clock = new FakeClock();
-    BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
-            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
-            .withMessageGetterField("message").withBatchTimeoutDivisor(3);
+    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new 
BulkMessageWriterBolt<IndexingConfigurations>(
+        "zookeeperUrl", "INDEXING")
+        .withBulkMessageWriter(bulkMessageWriter)
+        .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+        .withMessageGetterField("message")
+        .withBatchTimeoutDivisor(3);
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setZKCache(cache);
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
@@ -251,9 +261,11 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
   @Test
   public void testFlushOnTickTuple() throws Exception {
     FakeClock clock = new FakeClock();
-    BulkMessageWriterBolt bulkMessageWriterBolt = new 
BulkMessageWriterBolt("zookeeperUrl")
-            
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
-            .withMessageGetterField("message");
+    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new 
BulkMessageWriterBolt<IndexingConfigurations>(
+        "zookeeperUrl", "INDEXING")
+        .withBulkMessageWriter(bulkMessageWriter)
+        .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+        .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setZKCache(cache);
     
bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType
@@ -309,7 +321,8 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     FakeClock clock = new FakeClock();
 
     // setup the bolt
-    BulkMessageWriterBolt bolt = new BulkMessageWriterBolt("zookeeperUrl")
+    BulkMessageWriterBolt<IndexingConfigurations> bolt = new 
BulkMessageWriterBolt<IndexingConfigurations>(
+        "zookeeperUrl", "INDEXING")
             .withBulkMessageWriter(bulkMessageWriter)
             .withMessageGetter(MessageGetters.JSON_FROM_POSITION.name())
             .withMessageGetterField("message");
@@ -331,4 +344,5 @@ public class BulkMessageWriterBoltTest extends 
BaseEnrichmentBoltTest {
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
     verify(outputCollector, times(1)).ack(tuple);
   }
+
 }

Reply via email to