METRON-793: Migrate to storm-kafka-client kafka spout from storm-kafka closes 
apache/incubator-metron#486


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/98dc7659
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/98dc7659
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/98dc7659

Branch: refs/heads/master
Commit: 98dc7659a8ce86f5ced1324b0832f8950c6a65a9
Parents: c13ee82
Author: cstella <ceste...@gmail.com>
Authored: Wed Mar 29 08:04:37 2017 -0400
Committer: cstella <ceste...@gmail.com>
Committed: Wed Mar 29 08:04:37 2017 -0400

----------------------------------------------------------------------
 metron-analytics/metron-profiler/pom.xml        |   5 +
 .../src/main/config/profiler.properties         |   3 +-
 .../src/main/flux/profiler/remote.yaml          |  47 ++--
 .../integration/ProfilerIntegrationTest.java    |  25 +-
 .../METRON/CURRENT/configuration/metron-env.xml |   2 +-
 .../rest/service/impl/KafkaServiceImpl.java     |   3 +-
 metron-platform/metron-common/pom.xml           |  10 -
 .../resolver/ClasspathFunctionResolver.java     |  12 +-
 .../metron/common/spout/kafka/SpoutConfig.java  |  59 -----
 .../common/spout/kafka/SpoutConfigFunction.java |  24 --
 .../common/spout/kafka/SpoutConfigOptions.java  |  80 -------
 .../apache/metron/common/utils/KafkaUtils.java  |  28 ++-
 .../utils/timestamp/TimestampConverters.java    |  10 +-
 .../common/spout/kafka/SpoutConfigTest.java     | 102 --------
 .../common/utils/KafkaUtilsEndpointTest.java    |  64 +++++
 .../src/main/config/elasticsearch.properties    |   3 +-
 metron-platform/metron-enrichment/pom.xml       |   5 +
 .../src/main/flux/enrichment/remote.yaml        |  58 +++--
 .../src/main/flux/enrichment/test.yaml          |  52 +++--
 .../src/main/flux/indexing/remote.yaml          |  46 ++--
 .../integration/IndexingIntegrationTest.java    |   3 +-
 .../components/FluxTopologyComponent.java       |  58 ++++-
 .../integration/components/KafkaComponent.java  |  22 +-
 .../metron/management/KafkaFunctions.java       |  14 +-
 metron-platform/metron-parsers/README.md        |  29 ++-
 .../parsers/topology/ParserTopologyBuilder.java |  36 +--
 .../parsers/topology/ParserTopologyCLI.java     |  36 ++-
 .../components/ParserTopologyComponent.java     |  45 +++-
 .../parsers/topology/ParserTopologyCLITest.java |   2 -
 metron-platform/metron-pcap-backend/pom.xml     |   5 -
 .../src/main/flux/pcap/remote.yaml              |  44 ++--
 .../metron/spout/pcap/HDFSWriterCallback.java   | 111 ++++-----
 .../metron/spout/pcap/HDFSWriterConfig.java     |  41 ++++
 .../metron/spout/pcap/KafkaToHDFSSpout.java     |  12 +-
 .../metron/spout/pcap/PartitionHDFSWriter.java  |   9 +-
 .../apache/metron/spout/pcap/SpoutConfig.java   |  35 ---
 .../spout/pcap/deserializer/Deserializers.java  |  59 +++++
 .../pcap/deserializer/FromKeyDeserializer.java  |  70 ++++++
 .../deserializer/FromPacketDeserializer.java    |  43 ++++
 .../pcap/deserializer/KeyValueDeserializer.java |  41 ++++
 .../metron/spout/pcap/scheme/FromKeyScheme.java |  72 ------
 .../spout/pcap/scheme/FromPacketScheme.java     |  60 -----
 .../spout/pcap/scheme/KeyConvertible.java       |  25 --
 .../spout/pcap/scheme/TimestampScheme.java      |  46 ----
 .../pcap/scheme/TimestampSchemeCreator.java     |  26 ---
 .../PcapTopologyIntegrationTest.java            |   6 +-
 metron-platform/metron-pcap/pom.xml             |  12 +-
 .../java/org/apache/storm/kafka/Callback.java   |  26 ---
 .../apache/storm/kafka/CallbackCollector.java   | 186 ---------------
 .../apache/storm/kafka/CallbackKafkaSpout.java  |  92 --------
 .../org/apache/storm/kafka/EmitContext.java     | 147 ------------
 .../metron-solr/src/main/config/solr.properties |   3 +-
 metron-platform/metron-storm-kafka/pom.xml      | 128 ++++++++++
 .../kafka/flux/SimpleStormKafkaBuilder.java     | 232 +++++++++++++++++++
 .../storm/kafka/flux/SpoutConfiguration.java    | 139 +++++++++++
 .../storm/kafka/flux/StormKafkaSpout.java       |  63 +++++
 .../java/org/apache/storm/kafka/Callback.java   |  26 +++
 .../apache/storm/kafka/CallbackCollector.java   | 187 +++++++++++++++
 .../apache/storm/kafka/CallbackKafkaSpout.java  | 107 +++++++++
 .../org/apache/storm/kafka/EmitContext.java     |  95 ++++++++
 .../kafka/flux/SpoutConfigurationTest.java      |  63 +++++
 metron-platform/pom.xml                         |   1 +
 pom.xml                                         |  33 ++-
 63 files changed, 1881 insertions(+), 1247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/pom.xml 
b/metron-analytics/metron-profiler/pom.xml
index b0185b0..474b34c 100644
--- a/metron-analytics/metron-profiler/pom.xml
+++ b/metron-analytics/metron-profiler/pom.xml
@@ -78,6 +78,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-storm-kafka</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-statistics</artifactId>
             <version>${project.parent.version}</version>
             <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/config/profiler.properties 
b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 91e4226..860934f 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -39,4 +39,5 @@ profiler.hbase.flush.interval.seconds=30
 
 kafka.zk=node1:2181
 kafka.broker=node1:6667
-kafka.start=WHERE_I_LEFT_OFF
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start=UNCOMMITTED_EARLIEST

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml 
b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index f97b97a..7ea77a5 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -44,26 +44,45 @@ components:
               ref: "rowKeyBuilder"
             - name: "columnBuilder"
               ref: "columnBuilder"
-
-    -   id: "zkHosts"
-        className: "org.apache.storm.kafka.ZkHosts"
-        constructorArgs:
-            - "${kafka.zk}"
+    # Any kafka props for the producer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "value.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "key.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "group.id"
+                    - "profiler"
+
+  # The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - "value"
 
     -   id: "kafkaConfig"
-        className: "org.apache.metron.common.spout.kafka.SpoutConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
         constructorArgs:
             # zookeeper hosts
-            - ref: "zkHosts"
+            - ref: "kafkaProps"
             # topic name
             - "${profiler.input.topic}"
-            # zk root
-            - ""
-            # id
-            - "indexing"
+            - "${kafka.zk}"
+            - ref: "fields"
         configMethods:
-            -   name: "from"
-                args: ["${kafka.start}"]
+            -   name: "setFirstPollOffsetStrategy"
+                args:
+                    - "${kafka.start}"
+
 
     -   id: "kafkaWriter"
         className: "org.apache.metron.writer.kafka.KafkaWriter"
@@ -82,7 +101,7 @@ components:
 spouts:
 
     -   id: "kafkaSpout"
-        className: "org.apache.storm.kafka.KafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
         constructorArgs:
             - ref: "kafkaConfig"
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 357908d..bca8ed5 100644
--- 
a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ 
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler.integration;
 
+import com.google.common.base.Joiner;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.math.util.MathUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
-import org.apache.metron.common.spout.kafka.SpoutConfig;
 import org.apache.metron.common.utils.SerDeUtils;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.integration.BaseIntegrationTest;
@@ -39,7 +39,6 @@ import 
org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
 import org.apache.metron.statistics.OnlineStatisticsProvider;
-import org.apache.metron.statistics.StatisticsProvider;
 import org.apache.metron.test.mock.MockHTable;
 import org.junit.After;
 import org.junit.Assert;
@@ -176,13 +175,13 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
     List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, 
columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - 10.0.0.3 -> 1/6
-    Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(val, 1.0/6.0, epsilon)
+    Assert.assertTrue( "Could not find a value near 1/6. Actual values read 
are are: " + Joiner.on(",").join(actuals)
+                     , actuals.stream().anyMatch(val -> MathUtils.equals(val, 
1.0/6.0, epsilon)
     ));
 
     // verify - 10.0.0.2 -> 6/1
-    Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(val, 6.0/1.0, epsilon)
+    Assert.assertTrue("Could not find a value near 6. Actual values read are 
are: " + Joiner.on(",").join(actuals)
+            ,actuals.stream().anyMatch(val -> MathUtils.equals(val, 6.0/1.0, 
epsilon)
     ));
   }
 
@@ -206,8 +205,8 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
     List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, 
columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - there are 5 'HTTP' messages each with a length of 20, thus the 
average should be 20
-    Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(val, 20.0, epsilon)
+    Assert.assertTrue("Could not find a value near 20. Actual values read are 
are: " + Joiner.on(",").join(actuals)
+                     , actuals.stream().anyMatch(val -> MathUtils.equals(val, 
20.0, epsilon)
     ));
   }
 
@@ -232,8 +231,8 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
     List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), 
columnFamily, column, OnlineStatisticsProvider.class);
 
     // verify - there are 5 'HTTP' messages each with a length of 20, thus the 
average should be 20
-    Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(val.getMean(), 20.0, epsilon)
+    Assert.assertTrue("Could not find a value near 20. Actual values read are 
are: " + Joiner.on(",").join(actuals)
+                     , actuals.stream().anyMatch(val -> 
MathUtils.equals(val.getMean(), 20.0, epsilon)
     ));
   }
 
@@ -253,8 +252,8 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
     List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, 
columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - the 70th percentile of 5 x 20s = 20.0
-    Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(val, 20.0, epsilon)));
+    Assert.assertTrue("Could not find a value near 20. Actual values read are 
are: " + Joiner.on(",").join(actuals)
+                     , actuals.stream().anyMatch(val -> MathUtils.equals(val, 
20.0, epsilon)));
   }
 
   /**
@@ -290,7 +289,7 @@ public class ProfilerIntegrationTest extends 
BaseIntegrationTest {
 
     // storm topology properties
     final Properties topologyProperties = new Properties() {{
-      setProperty("kafka.start", SpoutConfig.Offset.BEGINNING.name());
+      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
       setProperty("profiler.workers", "1");
       setProperty("profiler.executors", "0");
       setProperty("profiler.input.topic", inputTopic);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 277b636..199e708 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -177,7 +177,7 @@ indexing.executors=0
 ##### Kafka #####
 kafka.zk={{ zookeeper_quorum }}
 kafka.broker={{ kafka_brokers }}
-kafka.start=WHERE_I_LEFT_OFF
+kafka.start=UNCOMMITTED_EARLIEST
 ##### Indexing #####
 index.input.topic=indexing
 index.error.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index 7b10dc4..33cb2e3 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.metron.rest.service.impl;
 import kafka.admin.AdminOperationException;
 import kafka.admin.AdminUtils$;
 import kafka.admin.RackAwareMode;
+import kafka.admin.RackAwareMode$;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -54,7 +55,7 @@ public class KafkaServiceImpl implements KafkaService {
     public KafkaTopic createTopic(KafkaTopic topic) throws RestException {
         if (!listTopics().contains(topic.getName())) {
           try {
-              adminUtils.createTopic(zkUtils, topic.getName(), 
topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), 
RackAwareMode.Disabled$.MODULE$);
+              adminUtils.createTopic(zkUtils, topic.getName(), 
topic.getNumPartitions(), topic.getReplicationFactor(), 
topic.getProperties(),RackAwareMode.Disabled$.MODULE$ );
           } catch (AdminOperationException e) {
               throw new RestException(e);
           }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml 
b/metron-platform/metron-common/pom.xml
index bb55a41..7416184 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -299,16 +299,6 @@
             <version>${global_flux_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-            <exclusions>
-                <exclusion>
-                    <artifactId>org.apache.curator</artifactId>
-                    <groupId>curator-client</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>org.apache.curator</groupId>
             <artifactId>curator-test</artifactId>
             <version>${global_curator_version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java
index 4589b61..3c9524b 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/resolver/ClasspathFunctionResolver.java
@@ -235,8 +235,16 @@ public class ClasspathFunctionResolver extends 
BaseFunctionResolver {
     }
 
     FilterBuilder filterBuilder = new FilterBuilder();
-    excludes.forEach(excl -> filterBuilder.exclude(excl));
-    includes.forEach(incl -> filterBuilder.include(incl));
+    excludes.forEach(excl -> {
+      if(excl != null) {
+        filterBuilder.exclude(excl);
+      }
+    });
+    includes.forEach(incl -> {
+      if(incl != null) {
+        filterBuilder.include(incl);
+      }
+    });
     Set<String> classes = new HashSet<>();
     Set<Class<? extends StellarFunction>> ret = new HashSet<>();
     for(ClassLoader cl : cls) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
deleted file mode 100644
index c26d1f5..0000000
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfig.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.spout.kafka;
-
-import org.apache.storm.kafka.BrokerHosts;
-
-public class SpoutConfig extends org.apache.storm.kafka.SpoutConfig {
-  public static enum Offset {
-    BEGINNING, END, WHERE_I_LEFT_OFF;
-  }
-  public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String 
id) {
-    super(hosts, topic, zkRoot, id);
-  }
-
-  public SpoutConfig from(String offset) {
-    try {
-      Offset o = Offset.valueOf(offset);
-      from(o);
-    }
-    catch(IllegalArgumentException iae) {
-      from(Offset.WHERE_I_LEFT_OFF);
-    }
-    ignoreZkOffsets = true;
-    startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-    return this;
-  }
-
-  public SpoutConfig from(Offset offset) {
-    if(offset == Offset.BEGINNING) {
-      ignoreZkOffsets = true;
-      startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-    }
-    else if(offset == Offset.END) {
-      ignoreZkOffsets = true;
-      startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-    }
-    else if(offset == Offset.WHERE_I_LEFT_OFF) {
-      ignoreZkOffsets = false;
-      startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-    }
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
deleted file mode 100644
index 5aec998..0000000
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigFunction.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.spout.kafka;
-
-public interface SpoutConfigFunction {
-  void configure(org.apache.storm.kafka.SpoutConfig config, Object val);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
deleted file mode 100644
index 7d1c7c3..0000000
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/spout/kafka/SpoutConfigOptions.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.spout.kafka;
-
-import com.google.common.base.Joiner;
-import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.storm.kafka.SpoutConfig;
-
-import java.util.EnumMap;
-import java.util.Map;
-
-public enum SpoutConfigOptions implements SpoutConfigFunction {
-  retryDelayMaxMs( (config, val) -> config.retryDelayMaxMs = convertVal(val, 
Long.class) ),
-  retryDelayMultiplier ( (config, val) -> config.retryDelayMultiplier = 
convertVal(val, Double.class)),
-  retryInitialDelayMs( (config, val) -> config.retryInitialDelayMs = 
convertVal(val, Long.class)),
-  stateUpdateIntervalMs( (config, val) -> config.stateUpdateIntervalMs = 
convertVal(val, Long.class)),
-  bufferSizeBytes( (config, val) -> config.bufferSizeBytes = convertVal(val, 
Integer.class)),
-  fetchMaxWait( (config, val) -> config.fetchMaxWait = convertVal(val, 
Integer.class)),
-  fetchSizeBytes( (config, val) -> config.fetchSizeBytes= convertVal(val, 
Integer.class)),
-  maxOffsetBehind( (config, val) -> config.maxOffsetBehind = convertVal(val, 
Long.class)),
-  metricsTimeBucketSizeInSecs( (config, val) -> 
config.metricsTimeBucketSizeInSecs = convertVal(val, Integer.class)),
-  socketTimeoutMs( (config, val) -> config.socketTimeoutMs = convertVal(val, 
Integer.class)),
-  ;
-
-  SpoutConfigFunction  spoutConfigFunc;
-  SpoutConfigOptions(SpoutConfigFunction spoutConfigFunc) {
-    this.spoutConfigFunc = spoutConfigFunc;
-  }
-
-  @Override
-  public void configure(SpoutConfig config, Object val) {
-    spoutConfigFunc.configure(config, val);
-  }
-
-  public static SpoutConfig configure(SpoutConfig config, 
EnumMap<SpoutConfigOptions, Object> configs) {
-    if(configs != null) {
-      for(Map.Entry<SpoutConfigOptions, Object> kv : configs.entrySet()) {
-        kv.getKey().configure(config, kv.getValue());
-      }
-    }
-    return config;
-  }
-
-  public static EnumMap<SpoutConfigOptions, Object> coerceMap(Map<String, 
Object> map) {
-    EnumMap<SpoutConfigOptions, Object> ret = new 
EnumMap<>(SpoutConfigOptions.class);
-    for(Map.Entry<String, Object> kv : map.entrySet()) {
-      try {
-        ret.put(SpoutConfigOptions.valueOf(kv.getKey()), kv.getValue());
-      }
-      catch(Exception ex) {
-        String possibleOptions = 
Joiner.on(",").join(SpoutConfigOptions.values());
-        throw new IllegalArgumentException("Configuration keys for spout 
config must be one of: " + possibleOptions, ex);
-      }
-    }
-    return ret;
-  }
-  private static <EXPECTED_T> EXPECTED_T convertVal(Object val, 
Class<EXPECTED_T> clazz) {
-    Object ret = ConversionUtils.convert(val, clazz);
-    if(ret == null) {
-      throw new IllegalArgumentException("Unable to convert " + val + " to 
expected type " + clazz.getCanonicalName());
-    }
-    return clazz.cast(ret);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
index 6d2af72..04c1389 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
@@ -19,11 +19,15 @@
 package org.apache.metron.common.utils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -47,7 +51,29 @@ public enum KafkaUtils {
       String brokerInfoStr = new String(data);
       Map<String, Object> brokerInfo = JSONUtils.INSTANCE.load(brokerInfoStr, 
new TypeReference<Map<String, Object>>() {
       });
-      ret.add(brokerInfo.get("host") + ":" + brokerInfo.get("port"));
+      String host = (String) brokerInfo.get("host");
+      if(host != null) {
+        ret.add(host + ":" + brokerInfo.get("port"));
+      }
+      else {
+        Object endpoints = brokerInfo.get("endpoints");
+        if(endpoints != null && endpoints instanceof List) {
+          List<String> eps = (List<String>)endpoints;
+          for(String url : eps) {
+            ret.addAll(fromEndpoint(url));
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  public List<String> fromEndpoint(String url) throws URISyntaxException {
+    List<String> ret = new ArrayList<>();
+    if(url != null) {
+      URI uri = new URI(url);
+      int port = uri.getPort();
+      ret.add(uri.getHost() + ((port > 0)?(":" + port):""));
     }
     return ret;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java
index 1c3aa7f..f0e0dd0 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/timestamp/TimestampConverters.java
@@ -19,6 +19,8 @@
 package org.apache.metron.common.utils.timestamp;
 
 
+import com.google.common.base.Joiner;
+
 public enum TimestampConverters implements TimestampConverter{
   MILLISECONDS(tsMilli -> tsMilli*1000000L)
   ,MICROSECONDS(tsMicro -> tsMicro*1000L)
@@ -29,7 +31,13 @@ public enum TimestampConverters implements 
TimestampConverter{
   }
 
   public static TimestampConverter getConverter(String converter) {
-    return TimestampConverters.valueOf(converter).converter;
+    if(converter != null) {
+      return TimestampConverters.valueOf(converter.toUpperCase()).converter;
+    }
+    else {
+      throw new IllegalStateException(converter + " is not a valid timestamp 
converter: "
+              + Joiner.on(",").join(TimestampConverters.values()));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
deleted file mode 100644
index 55c9032..0000000
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/spout/kafka/SpoutConfigTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.spout.kafka;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.utils.JSONUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SpoutConfigTest {
-
-  /**
-   {
-    "retryDelayMaxMs" : 1000,
-    "retryDelayMultiplier" : 1.2,
-    "retryInitialDelayMs" : 2000,
-    "stateUpdateIntervalMs" : 3000,
-    "bufferSizeBytes" : 4000,
-    "fetchMaxWait" : 5000,
-    "fetchSizeBytes" : 6000,
-    "maxOffsetBehind" : 7000,
-    "metricsTimeBucketSizeInSecs" : 8000,
-    "socketTimeoutMs" : 9000
-   }
-   */
-  @Multiline
-  public static String config;
-
-  @Test
-  public void testConfigApplication() throws IOException {
-    SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
-    Map<String, Object> configMap = JSONUtils.INSTANCE.load(config, new 
TypeReference<Map<String, Object>>() {
-    });
-    SpoutConfigOptions.configure(spoutConfig, 
SpoutConfigOptions.coerceMap(configMap));
-    Assert.assertEquals(1000, spoutConfig.retryDelayMaxMs);
-    Assert.assertEquals(1.2, spoutConfig.retryDelayMultiplier, 1e-7);
-    Assert.assertEquals(2000, spoutConfig.retryInitialDelayMs);
-    Assert.assertEquals(3000, spoutConfig.stateUpdateIntervalMs);
-    Assert.assertEquals(4000, spoutConfig.bufferSizeBytes);
-    Assert.assertEquals(5000, spoutConfig.fetchMaxWait);
-    Assert.assertEquals(6000, spoutConfig.fetchSizeBytes);
-    Assert.assertEquals(7000, spoutConfig.maxOffsetBehind);
-    Assert.assertEquals(8000, spoutConfig.metricsTimeBucketSizeInSecs);
-    Assert.assertEquals(9000, spoutConfig.socketTimeoutMs);
-  }
-  /**
-   {
-    "retryDelayMaxMs" : 1000,
-    "retryDelayMultiplier" : 1.2,
-    "retryInitialDelayMs" : 2000,
-    "stateUpdateIntervalMs" : 3000,
-    "bufferSizeBytes" : 4000
-   }
-   */
-  @Multiline
-  public static String incompleteConfig;
-  @Test
-  public void testIncompleteConfigApplication() throws IOException {
-    SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
-    Map<String, Object> configMap = JSONUtils.INSTANCE.load(incompleteConfig, 
new TypeReference<Map<String, Object>>() {
-    });
-    SpoutConfigOptions.configure(spoutConfig, 
SpoutConfigOptions.coerceMap(configMap));
-    Assert.assertEquals(1000, spoutConfig.retryDelayMaxMs);
-    Assert.assertEquals(1.2, spoutConfig.retryDelayMultiplier, 1e-7);
-    Assert.assertEquals(2000, spoutConfig.retryInitialDelayMs);
-    Assert.assertEquals(3000, spoutConfig.stateUpdateIntervalMs);
-    Assert.assertEquals(4000, spoutConfig.bufferSizeBytes);
-  }
-
-  @Test
-  public void testEmptyConfigApplication() throws IOException {
-    SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
-    SpoutConfigOptions.configure(spoutConfig, SpoutConfigOptions.coerceMap(new 
HashMap<>()));
-    //ensure defaults are used
-    Assert.assertEquals(60*1000, spoutConfig.retryDelayMaxMs);
-    Assert.assertEquals(1.0, spoutConfig.retryDelayMultiplier, 1e-7);
-    Assert.assertEquals(0, spoutConfig.retryInitialDelayMs);
-    Assert.assertEquals(2000, spoutConfig.stateUpdateIntervalMs);
-    Assert.assertEquals(1024*1024, spoutConfig.bufferSizeBytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
new file mode 100644
index 0000000..14a9e41
--- /dev/null
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class KafkaUtilsEndpointTest {
+  static String[] hostnames = new String[] { "node1", "localhost", 
"192.168.0.1", "my.domain.com" };
+  static String[] schemes = new String[] { "SSL", "PLAINTEXTSASL", 
"PLAINTEXT"};
+  static String[] ports = new String[] { "6667", "9091", null};
+  private String endpoint;
+  private String expected;
+
+  public KafkaUtilsEndpointTest(String endpoint, String expected) {
+    this.endpoint = endpoint;
+    this.expected = expected;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    List<Object[]> ret = new ArrayList<>();
+    for(String scheme : schemes) {
+      for(String hostname : hostnames) {
+        for(String port : ports) {
+          port = port != null?(":" + port):"";
+          String expected = hostname + port;
+          ret.add(new Object[]{scheme + "://" + expected, expected });
+        }
+      }
+    }
+    return ret;
+  }
+
+  @Test
+  public void testEndpointParsing() throws URISyntaxException {
+    Assert.assertEquals(expected, 
KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties 
b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index 27e9173..317742b 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ 
b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -22,7 +22,8 @@ indexing.executors=0
 
 kafka.zk=node1:2181
 kafka.broker=node1:6667
-kafka.start=WHERE_I_LEFT_OFF
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.start=UNCOMMITTED_EARLIEST
 
 ##### Indexing #####
 index.input.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml 
b/metron-platform/metron-enrichment/pom.xml
index 1ff46a9..045676f 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -38,6 +38,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-storm-kafka</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-writer</artifactId>
             <version>${project.parent.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml 
b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
index 054710f..439105f 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
@@ -176,32 +176,52 @@ components:
                     - "${kafka.zk}"
 
 #kafka/zookeeper
-    -   id: "zkHosts"
-        className: "org.apache.storm.kafka.ZkHosts"
-        constructorArgs:
-            - "${kafka.zk}"
+    # Any kafka props for the producer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+          -   name: "put"
+              args:
+                  - "value.deserializer"
+                  - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+          -   name: "put"
+              args:
+                  - "key.deserializer"
+                  - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+          -   name: "put"
+              args:
+                  - "group.id"
+                  - "enrichments"
+
+  # The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+          -   name: "add"
+              args:
+                  - "value"
+
     -   id: "kafkaConfig"
-        className: "org.apache.storm.kafka.SpoutConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
         constructorArgs:
-            # zookeeper hosts
-            - ref: "zkHosts"
-            # topic name
-            - "enrichments"
-            # zk root
-            - ""
-            # id
-            - "enrichments"
-        properties:
-            -   name: "ignoreZkOffsets"
-                value: true
-            -   name: "startOffsetTime"
-                value: -1
+          - ref: "kafkaProps"
+          # topic name
+          - "enrichments"
+          - "${kafka.zk}"
+          - ref: "fields"
+        configMethods:
+            -   name: "setFirstPollOffsetStrategy"
+                args:
+                    # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST
+                    - "UNCOMMITTED_EARLIEST"
+
 
 spouts:
     -   id: "kafkaSpout"
-        className: "org.apache.storm.kafka.KafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
         constructorArgs:
             - ref: "kafkaConfig"
+
 bolts:
 # Enrichment Bolts
     -   id: "enrichmentSplitBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml 
b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
index 077fedb..b7fb8d4 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/test.yaml
@@ -175,32 +175,52 @@ components:
                     - "${kafka.zk}"
 
 #kafka/zookeeper
-    -   id: "zkHosts"
-        className: "org.apache.storm.kafka.ZkHosts"
-        constructorArgs:
-            - "${kafka.zk}"
+# Any kafka props for the producer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "value.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "key.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "group.id"
+                    - "enrichments"
+
+  # The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - "value"
+
     -   id: "kafkaConfig"
-        className: "org.apache.storm.kafka.SpoutConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
         constructorArgs:
             # zookeeper hosts
-            - ref: "zkHosts"
+            - ref: "kafkaProps"
             # topic name
             - "enrichments"
-            # zk root
-            - ""
-            # id
-            - "enrichments"
-        properties:
-            -   name: "ignoreZkOffsets"
-                value: true
-            -   name: "startOffsetTime"
-                value: -2
+            - "${kafka.zk}"
+            - ref: "fields"
+        configMethods:
+            -   name: "setFirstPollOffsetStrategy"
+                args:
+                    - "UNCOMMITTED_EARLIEST"
+
 
 spouts:
     -   id: "kafkaSpout"
-        className: "org.apache.storm.kafka.KafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
         constructorArgs:
             - ref: "kafkaConfig"
+
 bolts:
 # Enrichment Bolts
     -   id: "enrichmentSplitBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml 
b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
index 3e329f4..3905a7a 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
@@ -63,29 +63,49 @@ components:
         className: "${writer.class.name}"
 
 #kafka/zookeeper
-    -   id: "zkHosts"
-        className: "org.apache.storm.kafka.ZkHosts"
-        constructorArgs:
-            - "${kafka.zk}"
+# Any kafka props for the producer go here.
+    -   id: "kafkaProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "value.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "key.deserializer"
+                    - 
"org.apache.kafka.common.serialization.ByteArrayDeserializer"
+            -   name: "put"
+                args:
+                    - "group.id"
+                    - "indexing"
+
+# The fields to pull out of the kafka messages
+    -   id: "fields"
+        className: "java.util.ArrayList"
+        configMethods:
+            -   name: "add"
+                args:
+                    - "value"
+
     -   id: "kafkaConfig"
-        className: "org.apache.metron.common.spout.kafka.SpoutConfig"
+        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
         constructorArgs:
-            # zookeeper hosts
-            - ref: "zkHosts"
+            - ref: "kafkaProps"
             # topic name
             - "${index.input.topic}"
-            # zk root
-            - ""
-            # id
-            - "indexing"
+            - "${kafka.zk}"
+            - ref: "fields"
         configMethods:
-            -   name: "from"
+            -   name: "setFirstPollOffsetStrategy"
                 args:
+                    # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST
                     - "${kafka.start}"
 
+
 spouts:
     -   id: "kafkaSpout"
-        className: "org.apache.storm.kafka.KafkaSpout"
+        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
         constructorArgs:
             - ref: "kafkaConfig"
 bolts:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 60cd1d1..394fbf0 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -26,7 +26,6 @@ import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.interfaces.FieldNameConverter;
-import org.apache.metron.common.spout.kafka.SpoutConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import 
org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.BaseIntegrationTest;
@@ -121,7 +120,7 @@ public abstract class IndexingIntegrationTest extends 
BaseIntegrationTest {
     final String dateFormat = "yyyy.MM.dd.HH";
     final List<byte[]> inputMessages = 
TestUtils.readSampleData(sampleParsedPath);
     final Properties topologyProperties = new Properties() {{
-      setProperty("kafka.start", SpoutConfig.Offset.BEGINNING.name());
+      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
       setProperty("indexing.workers", "1");
       setProperty("indexing.executors", "0");
       setProperty("index.input.topic", Constants.INDEXING_TOPIC);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
 
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
index 1c1f907..d34ff08 100644
--- 
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
+++ 
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/FluxTopologyComponent.java
@@ -42,6 +42,10 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -72,6 +76,7 @@ public class FluxTopologyComponent implements 
InMemoryComponent {
 
     public Builder withTopologyProperties(Properties properties) {
       this.topologyProperties = properties;
+      this.topologyProperties.put("storm.home", "target");
       return this;
     }
 
@@ -130,13 +135,64 @@ public class FluxTopologyComponent implements 
InMemoryComponent {
     }
   }
 
+  public static void cleanupWorkerDir() {
+    if(new File("logs/workers-artifacts").exists()) {
+      Path rootPath = Paths.get("logs");
+      Path destPath = Paths.get("target/logs");
+      try {
+        Files.move(rootPath, destPath);
+        Files.walk(destPath)
+                .sorted(Comparator.reverseOrder())
+                .map(Path::toFile)
+                .forEach(File::delete);
+      } catch (IOException e) {
+        throw new IllegalStateException(e.getMessage(), e);
+      }
+    }
+  }
+
   @Override
   public void stop() {
     if (stormCluster != null) {
-      stormCluster.shutdown();
+      try {
+          try {
+            stormCluster.shutdown();
+          } catch (IllegalStateException ise) {
+            if (!(ise.getMessage().contains("It took over") && 
ise.getMessage().contains("to shut down slot"))) {
+              throw ise;
+            }
+            else {
+              assassinateSlots();
+              LOG.error("Storm slots didn't shut down entirely cleanly *sigh*. 
 " +
+                      "I gave them the old one-two-skadoo and killed the slots 
with prejudice.  " +
+                      "If tests fail, we'll have to find a better way of 
killing them.", ise);
+            }
+        }
+      }
+      catch(Throwable t) {
+        LOG.error(t.getMessage(), t);
+      }
+      finally {
+        cleanupWorkerDir();
+      }
     }
   }
 
+  public static void assassinateSlots() {
+    /*
+    You might be wondering why I'm not just casting to slot here, but that's 
because the Slot class moved locations
+    and we're supporting multiple versions of storm.
+     */
+    Thread.getAllStackTraces().keySet().stream().filter(t -> t instanceof 
AutoCloseable && t.getName().toLowerCase().contains("slot")).forEach(t -> {
+      AutoCloseable slot = (AutoCloseable) t;
+      try {
+        slot.close();
+      } catch (Exception e) {
+        LOG.error("Tried to kill " + t.getName() + " but.." + e.getMessage(), 
e);
+      }
+    });
+  }
+
   public void submitTopology() throws NoSuchMethodException, IOException, 
InstantiationException, TException, IllegalAccessException, 
InvocationTargetException, ClassNotFoundException, NoSuchFieldException {
     startTopology(getTopologyName(), getTopologyLocation(), 
getTopologyProperties());
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
 
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index f6117dc..e55b317 100644
--- 
a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ 
b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -39,6 +39,8 @@ import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.wrapper.AdminUtilsWrapper;
 import org.apache.metron.integration.wrapper.TestUtilsWrapper;
 import org.apache.metron.test.utils.UnitTestHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -47,6 +49,8 @@ import java.util.logging.Level;
 
 public class KafkaComponent implements InMemoryComponent {
 
+  protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaComponent.class);
+
   public static class Topic {
     public int numPartitions;
     public String name;
@@ -56,6 +60,8 @@ public class KafkaComponent implements InMemoryComponent {
       this.name = name;
     }
   }
+
+  private List<KafkaProducer> producersCreated = new ArrayList<>();
   private transient KafkaServer kafkaServer;
   private transient ZkClient zkClient;
   private transient ConsumerConnector consumer;
@@ -128,7 +134,9 @@ public class KafkaComponent implements InMemoryComponent {
     producerConfig.put("message.max.bytes", "" + 1024*1024*10);
     producerConfig.put("message.send.max.retries", "10");
     producerConfig.putAll(properties);
-    return new KafkaProducer<>(producerConfig);
+    KafkaProducer<K, V> ret = new KafkaProducer<>(producerConfig);
+    producersCreated.add(ret);
+    return ret;
   }
 
   @Override
@@ -170,6 +178,7 @@ public class KafkaComponent implements InMemoryComponent {
   @Override
   public void stop() {
     shutdownConsumer();
+    shutdownProducers();
     if(kafkaServer != null) {
       kafkaServer.shutdown();
       kafkaServer.awaitShutdown();
@@ -219,6 +228,17 @@ public class KafkaComponent implements InMemoryComponent {
     }
   }
 
+  public void shutdownProducers() {
+    for(KafkaProducer kp : producersCreated) {
+      try {
+        kp.close();
+      }
+      catch(Exception ex) {
+        LOG.error(ex.getMessage(), ex);
+      }
+    }
+  }
+
   public void createTopic(String name) throws InterruptedException {
     createTopic(name, 1, true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index 046bcff..c5428d2 100644
--- 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -18,9 +18,13 @@
 
 package org.apache.metron.management;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.metron.common.dsl.Context;
@@ -29,14 +33,7 @@ import org.apache.metron.common.dsl.Stellar;
 import org.apache.metron.common.dsl.StellarFunction;
 import org.apache.metron.common.utils.ConversionUtils;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG;
@@ -128,6 +125,7 @@ public class KafkaFunctions {
 
       // build the properties for kafka
       Properties properties = buildKafkaProperties(overrides, context);
+      properties.put("max.poll.records", count);
 
       // read some messages
       try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(properties)) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index 2cf9bbf..cc45834 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -296,24 +296,21 @@ usage: start_parser_topology.sh
 These options are intended to configure the Storm Kafka Spout more completely. 
 These options can be
 specified in a JSON file containing a map associating the kafka spout 
configuration parameter to a value.
 The range of values possible to configure are:
-* retryDelayMaxMs
-* retryDelayMultiplier
-* retryInitialDelayMs
-* stateUpdateIntervalMs
-* bufferSizeBytes
-* fetchMaxWait
-* fetchSizeBytes
-* maxOffsetBehind
-* metricsTimeBucketSizeInSecs
-* socketTimeoutMs
-
-These are described in some detail 
[here](https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.4/bk_storm-user-guide/content/storm-kafka-api-ref.html).
-
-For instance, creating a JSON file which will set the `bufferSizeBytes` to 2MB 
and `retryDelayMaxMs` to 2000 would look like
+* `spout.pollTimeoutMs` -  Specifies the time, in milliseconds, spent waiting 
in poll if data is not available. Default is 2s
+* `spout.firstPollOffsetStrategy` - Sets the offset used by the Kafka spout in 
the first poll to Kafka broker upon process start.  One of
+  * `EARLIEST`
+  * `LATEST`
+  * `UNCOMMITTED_EARLIEST` - Last uncommitted and if offsets aren't found, 
defaults to earliest. NOTE: This is the default.
+  * `UNCOMMITTED_LATEST` - Last uncommitted and if offsets aren't found, 
defaults to latest.
+* `spout.offsetCommitPeriodMs` - Specifies the period, in milliseconds, the 
offset commit task is periodically called. Default is 15s.
+* `spout.maxUncommittedOffsets` - Defines the max number of polled offsets 
(records) that can be pending commit, before another poll can take place. Once 
this limit is reached, no more offsets (records) can be polled until the next 
successful commit(s) sets the number of pending offsets bellow the threshold. 
The default is 10,000,000. 
+* `spout.maxRetries` -  Defines the max number of retrials in case of tuple 
failure. The default is to retry forever, which means that no new records are 
committed until the previous polled records have been acked. This guarantees at 
once delivery of all the previously polled records.  By specifying a finite 
value for maxRetries, the user decides to sacrifice guarantee of delivery for 
the previous polled records in favor of processing more records.
+* Any of the configs in the Consumer API for [Kafka 
0.10.x](http://kafka.apache.org/0100/documentation.html#newconsumerconfigs)
+
+For instance, creating a JSON file which will set the offsets to 
`UNCOMMITTED_EARLIEST`
 ```
 {
-  "bufferSizeBytes" : 2000000,
-  "retryDelayMaxMs" : 2000
+  "spout.firstPollOffsetStrategy" : "UNCOMMITTED_EARLIEST"
 }
 ```
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index aeac33c..b347ca5 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -17,6 +17,11 @@
  */
 package org.apache.metron.parsers.topology;
 
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
+import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
+import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.Constants;
@@ -26,8 +31,6 @@ import 
org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.MessageWriter;
-import org.apache.metron.common.spout.kafka.SpoutConfig;
-import org.apache.metron.common.spout.kafka.SpoutConfigOptions;
 import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterBolt;
@@ -36,10 +39,8 @@ import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.writer.AbstractWriter;
 import org.apache.metron.writer.kafka.KafkaWriter;
 import org.json.simple.JSONObject;
-import org.apache.storm.kafka.KafkaSpout;
-import org.apache.storm.kafka.ZkHosts;
 
-import java.util.EnumMap;
+import java.util.*;
 
 /**
  * Builds a Storm topology that parses telemetry data received from a sensor.
@@ -52,7 +53,6 @@ public class ParserTopologyBuilder {
    * @param zookeeperUrl             Zookeeper URL
    * @param brokerUrl                Kafka Broker URL
    * @param sensorType               Type of sensor
-   * @param offset                   Kafka topic offset where the topology 
will start; BEGINNING, END, WHERE_I_LEFT_OFF
    * @param spoutParallelism         Parallelism hint for the spout
    * @param spoutNumTasks            Number of tasks for the spout
    * @param parserParallelism        Parallelism hint for the parser bolt
@@ -66,14 +66,13 @@ public class ParserTopologyBuilder {
   public static TopologyBuilder build(String zookeeperUrl,
                                       String brokerUrl,
                                       String sensorType,
-                                      SpoutConfig.Offset offset,
                                       int spoutParallelism,
                                       int spoutNumTasks,
                                       int parserParallelism,
                                       int parserNumTasks,
                                       int errorWriterParallelism,
                                       int errorWriterNumTasks,
-                                      EnumMap<SpoutConfigOptions, Object> 
kafkaSpoutConfig
+                                      Map<String, Object> kafkaSpoutConfig
   ) throws Exception {
 
     // fetch configuration from zookeeper
@@ -82,7 +81,7 @@ public class ParserTopologyBuilder {
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
-    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, offset, 
kafkaSpoutConfig, parserConfig);
+    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, 
Optional.ofNullable(kafkaSpoutConfig) , parserConfig);
     builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
             .setNumTasks(spoutNumTasks);
 
@@ -106,19 +105,22 @@ public class ParserTopologyBuilder {
   /**
    * Create a spout that consumes tuples from a Kafka topic.
    *
-   * @param zookeeperUrl            Zookeeper URL
+   * @param zkQuorum Zookeeper URL
    * @param sensorType              Type of sensor
-   * @param offset                  Kafka topic offset where the topology will 
start; BEGINNING, END, WHERE_I_LEFT_OFF
-   * @param kafkaSpoutConfigOptions Configuration options for the kafka spout
+   * @param kafkaConfigOptional     Configuration options for the kafka spout
    * @param parserConfig            Configuration for the parser
    * @return
    */
-  private static KafkaSpout createKafkaSpout(String zookeeperUrl, String 
sensorType, SpoutConfig.Offset offset, EnumMap<SpoutConfigOptions, Object> 
kafkaSpoutConfigOptions, SensorParserConfig parserConfig) {
-
+  private static StormKafkaSpout<Object, Object> createKafkaSpout(String 
zkQuorum, String sensorType, Optional<Map<String, Object>> kafkaConfigOptional, 
SensorParserConfig parserConfig) {
+    Map<String, Object> kafkaSpoutConfigOptions = 
kafkaConfigOptional.orElse(new HashMap<>());
     String inputTopic = parserConfig.getSensorTopic() != null ? 
parserConfig.getSensorTopic() : sensorType;
-    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeeperUrl), 
inputTopic, "", inputTopic).from(offset);
-    SpoutConfigOptions.configure(spoutConfig, kafkaSpoutConfigOptions);
-    return new KafkaSpout(spoutConfig);
+    kafkaSpoutConfigOptions.putIfAbsent( 
SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key
+            , 
KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.toString()
+    );
+    kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID
+            , inputTopic + "_parser"
+    );
+    return SimpleStormKafkaBuilder.create(inputTopic, zkQuorum, 
Arrays.asList("value"), kafkaSpoutConfigOptions);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 2bf484e..8cf921e 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.topology;
 
+import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
@@ -26,19 +27,15 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
 import org.apache.commons.cli.*;
 import org.apache.commons.io.FileUtils;
-import org.apache.metron.common.spout.kafka.SpoutConfig;
-import org.apache.metron.common.spout.kafka.SpoutConfigOptions;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.topology.config.Arg;
 import org.apache.metron.parsers.topology.config.ConfigHandlers;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 import java.util.function.Function;
 
 public class ParserTopologyCLI {
@@ -156,7 +153,11 @@ public class ParserTopologyCLI {
     }, new ConfigHandlers.SetMessageTimeoutHandler()
     )
     ,EXTRA_OPTIONS("e", code -> {
-      Option o = new Option(code, "extra_topology_options", true, "Extra 
options in the form of a JSON file with a map for content.");
+      Option o = new Option(code, "extra_topology_options", true
+                           , "Extra options in the form of a JSON file with a 
map for content." +
+                             "  Available options are those in the Kafka 
Consumer Configs at 
http://kafka.apache.org/0100/documentation.html#newconsumerconfigs"; +
+                             " and " + 
Joiner.on(",").join(SpoutConfiguration.allOptions())
+                           );
       o.setArgName("JSON_FILE");
       o.setRequired(false);
       o.setType(String.class);
@@ -167,8 +168,8 @@ public class ParserTopologyCLI {
       Option o = new Option(code
                            , "extra_kafka_spout_config"
                            , true
-                           , "Extra spout config options in the form of a JSON 
file with a map for content.  " +
-                             "Possible keys are: " + 
Joiner.on(",").join(SpoutConfigOptions.values()));
+                           , "Extra spout config options in the form of a JSON 
file with a map for content."
+                           );
       o.setArgName("JSON_FILE");
       o.setRequired(false);
       o.setType(String.class);
@@ -182,13 +183,6 @@ public class ParserTopologyCLI {
       o.setRequired(false);
       return o;
     })
-    ,KAFKA_OFFSET("koff", code ->
-    {
-      Option o = new Option("koff", "kafka_offset", true, "Kafka offset");
-      o.setArgName("BEGINNING|WHERE_I_LEFT_OFF");
-      o.setRequired(false);
-      return o;
-    })
     ;
     Option option;
     String shortCode;
@@ -286,18 +280,14 @@ public class ParserTopologyCLI {
       int errorNumTasks= 
Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
       int invalidParallelism = 
Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1"));
       int invalidNumTasks= 
Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1"));
-      EnumMap<SpoutConfigOptions, Object> spoutConfig = new 
EnumMap<SpoutConfigOptions, Object>(SpoutConfigOptions.class);
+      Map<String, Object> spoutConfig = new HashMap<>();
       if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
         spoutConfig = readSpoutConfig(new 
File(ParserOptions.SPOUT_CONFIG.get(cmd)));
       }
-      SpoutConfig.Offset offset = cmd.hasOption("t") ? 
SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF;
-      if(cmd.hasOption("koff")) {
-        offset = SpoutConfig.Offset.valueOf(cmd.getOptionValue("koff"));
-      }
+
       TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
               brokerUrl,
               sensorType,
-              offset,
               spoutParallelism,
               spoutNumTasks,
               parserParallelism,
@@ -322,7 +312,7 @@ public class ParserTopologyCLI {
       System.exit(-1);
     }
   }
-  private static EnumMap<SpoutConfigOptions, Object> readSpoutConfig(File 
inputFile) {
+  private static Map<String, Object> readSpoutConfig(File inputFile) {
     String json = null;
     if (inputFile.exists()) {
       try {
@@ -335,8 +325,8 @@ public class ParserTopologyCLI {
       throw new IllegalArgumentException("Unable to load JSON file at " + 
inputFile.getAbsolutePath());
     }
     try {
-      return SpoutConfigOptions.coerceMap(JSONUtils.INSTANCE.load(json, new 
TypeReference<Map<String, Object>>() {
-      }));
+      return JSONUtils.INSTANCE.load(json, new TypeReference<Map<String, 
Object>>() {
+      });
     } catch (IOException e) {
       throw new IllegalStateException("Unable to process JSON.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 73d3827..48bcbec 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,24 +17,36 @@
  */
 package org.apache.metron.parsers.integration.components;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.topology.TopologyBuilder;
-import org.apache.metron.common.spout.kafka.SpoutConfig;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.parsers.topology.ParserTopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitOption;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static 
org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
+import static 
org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
+
 public class ParserTopologyComponent implements InMemoryComponent {
 
+  protected static final Logger LOG = 
LoggerFactory.getLogger(ParserTopologyComponent.class);
   private Properties topologyProperties;
   private String brokerUrl;
   private String sensorType;
-  private SpoutConfig.Offset offset = SpoutConfig.Offset.BEGINNING;
   private LocalCluster stormCluster;
 
   public static class Builder {
@@ -65,9 +77,6 @@ public class ParserTopologyComponent implements 
InMemoryComponent {
     this.sensorType = sensorType;
   }
 
-  public void setOffset(SpoutConfig.Offset offset) {
-    this.offset = offset;
-  }
 
   @Override
   public void start() throws UnableToStartException {
@@ -75,7 +84,6 @@ public class ParserTopologyComponent implements 
InMemoryComponent {
       TopologyBuilder topologyBuilder = 
ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk")
                                                                    , brokerUrl
                                                                    , sensorType
-                                                                   , offset
                                                                    , 1
                                                                    , 1
                                                                    , 1
@@ -95,8 +103,29 @@ public class ParserTopologyComponent implements 
InMemoryComponent {
 
   @Override
   public void stop() {
-    if(stormCluster != null) {
-      stormCluster.shutdown();
+    if (stormCluster != null) {
+      try {
+        try {
+          stormCluster.shutdown();
+        } catch (IllegalStateException ise) {
+          if (!(ise.getMessage().contains("It took over") && 
ise.getMessage().contains("to shut down slot"))) {
+            throw ise;
+          }
+          else {
+            assassinateSlots();
+            LOG.error("Storm slots didn't shut down entirely cleanly *sigh*.  
" +
+                    "I gave them the old one-two-skadoo and killed the slots 
with prejudice.  " +
+                    "If tests fail, we'll have to find a better way of killing 
them.", ise);
+          }
+        }
+      }
+      catch(Throwable t) {
+        LOG.error(t.getMessage(), t);
+      }
+      finally {
+        cleanupWorkerDir();
+      }
+
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index bb3cab6..5e70177 100644
--- 
a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ 
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -85,12 +85,10 @@ public class ParserTopologyCLITest {
     CommandLine cli = new 
CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
                                       
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
-                                      
.with(ParserTopologyCLI.ParserOptions.KAFKA_OFFSET, "BEGINNING")
                                       .build(longOpt);
     Assert.assertEquals("myzk", 
ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
     Assert.assertEquals("mybroker", 
ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
     Assert.assertEquals("mysensor", 
ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
-    Assert.assertEquals("BEGINNING", 
ParserTopologyCLI.ParserOptions.KAFKA_OFFSET.get(cli));
   }
   @Test
   public void testCLI_happyPath() throws ParseException {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/pom.xml 
b/metron-platform/metron-pcap-backend/pom.xml
index 95dbdb2..704493e 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -44,11 +44,6 @@
             <version>${global_flux_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka</artifactId>
-            <version>${global_storm_version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-common</artifactId>
             <version>${project.parent.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml 
b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
index 272ac13..732991b 100644
--- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
+++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
@@ -19,29 +19,35 @@ config:
     topology.workers: 1
 
 components:
-  - id: "zkHosts"
-    className: "org.apache.storm.kafka.ZkHosts"
-    constructorArgs:
-      - "${kafka.zk}"
+
+  # Any kafka props for the producer go here.
+  - id: "kafkaProps"
+    className: "java.util.HashMap"
+    configMethods:
+      -   name: "put"
+          args:
+            - "value.deserializer"
+            - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+      -   name: "put"
+          args:
+            - "key.deserializer"
+            - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+      -   name: "put"
+          args:
+            - "group.id"
+            - "pcap"
   - id: "kafkaConfig"
-    className: "org.apache.metron.spout.pcap.SpoutConfig"
+    className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
     constructorArgs:
-      # zookeeper hosts
-      - ref: "zkHosts"
+      - ref: "kafkaProps"
       # topic name
       - "${spout.kafka.topic.pcap}"
-      # zk root
-      - ""
-      # id
-      - "${spout.kafka.topic.pcap}"
+      - "${kafka.zk}"
     configMethods:
-      -   name: "from"
-          args:
-            - "${kafka.pcap.start}"
-      -   name: "withTimestampScheme"
+      -   name: "setFirstPollOffsetStrategy"
           args:
-            - "${kafka.pcap.ts_scheme}"
-            - "${kafka.pcap.ts_granularity}"
+            # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+            - "UNCOMMITTED_EARLIEST"
 
   - id: "writerConfig"
     className: "org.apache.metron.spout.pcap.HDFSWriterConfig"
@@ -58,6 +64,10 @@ components:
       -   name: "withZookeeperQuorum"
           args:
             - "${kafka.zk}"
+      -   name: "withDeserializer"
+          args:
+            - "${kafka.pcap.ts_scheme}"
+            - "${kafka.pcap.ts_granularity}"
 spouts:
   - id: "kafkaSpout"
     className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"

Reply via email to