Repository: storm
Updated Branches:
  refs/heads/master a8946adde -> 399e35f10


STORM-2548: Simplify KafkaSpoutConfig to avoid duplicating KafkaConsumer 
configuration parameters


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/01a56ec9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/01a56ec9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/01a56ec9

Branch: refs/heads/master
Commit: 01a56ec992e9343f25b5613219dbf8a5ca76e95f
Parents: 8994777
Author: Stig Rohde Døssing <stigdoess...@gmail.com>
Authored: Wed Jun 28 22:28:04 2017 +0200
Committer: Stig Rohde Døssing <s...@apache.org>
Committed: Tue Jul 18 11:53:13 2017 +0200

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      | 140 ++++---
 .../TridentKafkaClientWordCountNamedTopics.java |   5 +-
 ...identKafkaClientWordCountWildcardTopics.java |   5 +-
 external/storm-kafka-client/pom.xml             |   2 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 364 +++++--------------
 .../kafka/spout/SerializableDeserializer.java   |  26 --
 .../internal/KafkaConsumerFactoryDefault.java   |   3 +-
 .../spout/trident/KafkaTridentSpoutManager.java |   3 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   8 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   3 +-
 .../SingleTopicKafkaSpoutConfiguration.java     |   5 +-
 .../test/KafkaSpoutTopologyMainNamedTopics.java |   3 +-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   3 +-
 13 files changed, 186 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 33b4daf..ada8619 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -16,8 +16,8 @@ You need to provide implementations for the following 2 
interfaces
 These interfaces have 2 methods defined:
 
 ```java
-    K getKeyFromTuple(Tuple/TridentTuple tuple);
-    V getMessageFromTuple(Tuple/TridentTuple tuple);
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
 ```
 
 As the name suggests, these methods are called to map a tuple to a Kafka key 
and a Kafka message. If you just want one field
@@ -53,10 +53,9 @@ These are also defined in 
`org.apache.kafka.clients.producer.ProducerConfig`
 ### Using wildcard kafka topic match
 You can do a wildcard topic match by adding the following config
 
-```
-     Config config = new Config();
-     config.put("kafka.topic.wildcard.match",true);
-
+```java
+Config config = new Config();
+config.put("kafka.topic.wildcard.match",true);
 ```
 
 After this you can specify a wildcard topic for matching e.g. 
clickstream.*.log.  This will match all streams matching clickstream.my.log, 
clickstream.cart.log etc
@@ -67,65 +66,65 @@ After this you can specify a wildcard topic for matching 
e.g. clickstream.*.log.
 For the bolt :
 
 ```java
-        TopologyBuilder builder = new TopologyBuilder();
-
-        Fields fields = new Fields("key", "message");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                    new Values("storm", "1"),
-                    new Values("trident", "1"),
-                    new Values("needs", "1"),
-                    new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-        builder.setSpout("spout", spout, 5);
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("bootstrap.servers", "localhost:9092");
-        props.put("acks", "1");
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-
-        KafkaBolt bolt = new KafkaBolt()
-                .withProducerProperties(props)
-                .withTopicSelector(new DefaultTopicSelector("test"))
-                .withTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper());
-        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-
-        Config conf = new Config();
-
-        StormSubmitter.submitTopology("kafkaboltTest", conf, 
builder.createTopology());
+TopologyBuilder builder = new TopologyBuilder();
+
+Fields fields = new Fields("key", "message");
+FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+            new Values("storm", "1"),
+            new Values("trident", "1"),
+            new Values("needs", "1"),
+            new Values("javadoc", "1")
+);
+spout.setCycle(true);
+builder.setSpout("spout", spout, 5);
+//set producer properties.
+Properties props = new Properties();
+props.put("bootstrap.servers", "localhost:9092");
+props.put("acks", "1");
+props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+KafkaBolt bolt = new KafkaBolt()
+        .withProducerProperties(props)
+        .withTopicSelector(new DefaultTopicSelector("test"))
+        .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+
+Config conf = new Config();
+
+StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
 
 For Trident:
 
 ```java
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", "1"),
-                new Values("trident", "1"),
-                new Values("needs", "1"),
-                new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("bootstrap.servers", "localhost:9092");
-        props.put("acks", "1");
-        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-
-        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-                .withProducerProperties(props)
-                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-                .withTridentTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("word", "count"));
-        stream.partitionPersist(stateFactory, fields, new 
TridentKafkaStateUpdater(), new Fields());
-
-        Config conf = new Config();
-        StormSubmitter.submitTopology("kafkaTridentTest", conf, 
topology.build());
+Fields fields = new Fields("word", "count");
+FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+        new Values("storm", "1"),
+        new Values("trident", "1"),
+        new Values("needs", "1"),
+        new Values("javadoc", "1")
+);
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+Stream stream = topology.newStream("spout1", spout);
+
+//set producer properties.
+Properties props = new Properties();
+props.put("bootstrap.servers", "localhost:9092");
+props.put("acks", "1");
+props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+
+TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+        .withProducerProperties(props)
+        .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+        .withTridentTupleToKafkaMapper(new 
FieldNameBasedTupleToKafkaMapper("word", "count"));
+stream.partitionPersist(stateFactory, fields, new TridentKafkaStateUpdater(), 
new Fields());
+
+Config conf = new Config();
+StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
 ```
 
 ## Reading From kafka (Spouts)
@@ -142,10 +141,8 @@ a spout.
 `topics` The topics the spout will consume can either be a `Collection` of 
specific topic names (1 or more) or a regular expression `Pattern`, which 
specifies
 that any topics that match that regular expression will be consumed.
 
-In the case of the Constructors you may also need to specify a key 
deserializer and a value deserializer.  This is to help guarantee type safety 
through the use
-of Java generics.  The defaults are `StringDeserializer`s and can be 
overwritten by calling `setKeyDeserializer` and/or `setValueDeserializer`.
-If these are set to null the code will fall back to what is set in the kafka 
properties, but it is preferable to be explicit here, again to maintain 
-type safety with the generics.
+If you are using the Builder Constructors instead of one of the `builder` 
methods, you will also need to specify a key deserializer and a value 
deserializer.  This is to help guarantee type safety through the use
+of Java generics.  The deserializers can be specified via the consumer 
properties set with `setProp`. See the KafkaConsumer configuration 
documentation for details.
 
 There are a few key configs to pay attention to.
 
@@ -161,11 +158,7 @@ for the first time. Allowed values include
 By default the "topic", "partition", "offset", "key", and "value" will be 
emitted to the "default" stream.  If you want to output entries to different
 streams based on the topic, storm provides `ByTopicRecordTranslator`.  See 
below for more examples on how to use these.
 
-`setProp` can be used to set kafka properties that do not have a convenience 
method.
-
-`setGroupId` lets you set the id of the kafka consumer group property 
"group.id'
-
-`setSSLKeystore` and `setSSLTruststore` allow you to configure SSL 
authentication.
+`setProp` and `setProps` can be used to set KafkaConsumer properties. The list 
of these properties can be found in the KafkaConsumer configuration 
documentation on the [Kafka 
website](http://kafka.apache.org/documentation.html#consumerconfigs).
 
 ### Usage Examples
 
@@ -229,7 +222,7 @@ output topic it will throw an exception and not continue.
 In most cases the built in SimpleRecordTranslator and ByTopicRecordTranslator 
should cover your use case.  If you do run into a situation where you need a 
custom one
 then this documentation will describe how to do this properly, and some of the 
less than obvious classes involved.
 
-The point of apply is to take a ConsumerRecord and turn it into a 
`List<Object>` that can be emitted.  What is not obvious is how to tell the 
spout to emit it to a
+The point of `apply` is to take a ConsumerRecord and turn it into a 
`List<Object>` that can be emitted.  What is not obvious is how to tell the 
spout to emit it to a
 specific stream.  To do this you will need to return an instance of 
`org.apache.storm.kafka.spout.KafkaTuple`.  This provides a method `routedTo` 
that will say which
 specific stream the tuple should go to.
 
@@ -243,7 +236,7 @@ Will cause the tuple to be emitted on the "bar" stream.
 
 Be careful when writing custom record translators because just like in a storm 
spout it needs to be self consistent.  The `streams` method should return
 a full set of streams that this translator will ever try to emit on.  
Additionally `getFieldsFor` should return a valid Fields object for each of 
those
-streams.  If you are doing this for Trident a value must be in the List 
returned by apply for every field in the Fields object for that stream,
+streams.  If you are doing this for Trident a value must be in the List 
returned by `apply` for every field in the Fields object for that stream,
 otherwise trident can throw exceptions.
 
 
@@ -327,7 +320,7 @@ When selecting a kafka client version, you should ensure -
 
 # Kafka Spout Performance Tuning
 
-The Kafka spout provides two internal parameters to control its performance. 
The parameters can be set using the [KafkaSpoutConfig] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
 methods [setOffsetCommitPeriodMs] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193)
 and [setMaxUncommittedOffsets] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217).
 
+The Kafka spout provides two internal parameters to control its performance. 
The parameters can be set using the 
[KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
 methods 
[setOffsetCommitPeriodMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193)
 and 
[setMaxUncommittedOffsets](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217).
 
 
 * "offset.commit.period.ms" controls how often the spout commits to Kafka
 * "max.uncommitted.offsets" controls how many offsets can be pending commit 
before another poll can take place
@@ -337,7 +330,7 @@ The [Kafka consumer config] 
(http://kafka.apache.org/documentation.html#consumer
 
 * “fetch.min.bytes”
 * “fetch.max.wait.ms”
-* [Kafka Consumer] 
(http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
 instance poll timeout, which is specified for each Kafka spout using the 
[KafkaSpoutConfig] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
 method [setPollTimeoutMs] 
(https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+* [Kafka 
Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
 instance poll timeout, which is specified for each Kafka spout using the 
[KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
 method 
[setPollTimeoutMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
 <br/>
 
 Depending on the structure of your Kafka cluster, distribution of the data, 
and availability of data to poll, these parameters will have to be configured 
appropriately. Please refer to the Kafka documentation on Kafka parameter 
tuning.
@@ -356,6 +349,7 @@ Currently the Kafka spout has has the following default 
values, which have shown
 If reliability isn't important to you -- that is, you don't care about losing 
tuples in failure situations --, and want to remove the overhead of tuple 
tracking, then you can run a KafkaSpout with AutoCommitMode.
 
 To enable it, you need to:
+
 * set Config.TOPOLOGY_ACKERS to 0;
 * enable *AutoCommitMode* in Kafka consumer configuration; 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index 9ab00f5..886d15d 100644
--- 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
@@ -63,8 +64,8 @@ public class TridentKafkaClientWordCountNamedTopics {
 
     protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
         return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2)
-                .setGroupId("kafkaSpoutTestGroup_" + System.nanoTime())
-                .setMaxPartitionFectchBytes(200)
+                .setProp(ConsumerConfig.GROUP_ID_CONFIG, 
"kafkaSpoutTestGroup_" + System.nanoTime())
+                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
                 .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                 .setRetry(newRetryService())
                 .setOffsetCommitPeriodMs(10_000)

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
index 62ccef0..0be3127 100644
--- 
a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
+++ 
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountWildcardTopics.java
@@ -21,6 +21,7 @@ package org.apache.storm.kafka.trident;
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.tuple.Fields;
@@ -31,8 +32,8 @@ public class TridentKafkaClientWordCountWildcardTopics 
extends TridentKafkaClien
 
     protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
         return KafkaSpoutConfig.builder("127.0.0.1:9092", 
TOPIC_WILDCARD_PATTERN)
-                .setGroupId("kafkaSpoutTestGroup")
-                .setMaxPartitionFectchBytes(200)
+                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+                .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
                 .setRecordTranslator((r) -> new Values(r.value()), new 
Fields("str"))
                 .setRetry(newRetryService())
                 .setOffsetCommitPeriodMs(10_000)

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 843868f..6db04a0 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -152,7 +152,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>9</maxAllowedViolations>
+                    <maxAllowedViolations>6</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 43a6e0b..6f09f5f 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -27,7 +27,6 @@ import java.util.Properties;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.tuple.Fields;
@@ -47,6 +46,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    
     // 2s
     public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; 
+    public static final FirstPollOffsetStrategy 
DEFAULT_FIRST_POLL_OFFSET_STRATEGY = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
     public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =  
             new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
                     DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
@@ -56,6 +56,37 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = 
         new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(0),
             DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
+    
+    // Kafka consumer configuration
+    private final Map<String, Object> kafkaProps;
+    private final Subscription subscription;
+    private final long pollTimeoutMs;
+
+    // Kafka spout configuration
+    private final RecordTranslator<K, V> translator;
+    private final long offsetCommitPeriodMs;
+    private final int maxUncommittedOffsets;
+    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final KafkaSpoutRetryService retryService;
+    private final long partitionRefreshPeriodMs;
+    private final boolean emitNullTuples;
+
+    /**
+     * Creates a new KafkaSpoutConfig using a Builder.
+     * @param builder The Builder to construct the KafkaSpoutConfig from
+     */
+    public KafkaSpoutConfig(Builder<K,V> builder) {
+        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+        this.subscription = builder.subscription;
+        this.translator = builder.translator;
+        this.pollTimeoutMs = builder.pollTimeoutMs;
+        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+        this.retryService = builder.retryService;
+        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+        this.emitNullTuples = builder.emitNullTuples;
+    }
 
     /**
      * The offset used by the Kafka spout in the first poll to Kafka broker. 
The choice of this parameter will affect the number of consumer
@@ -77,163 +108,50 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         EARLIEST,
         LATEST,
         UNCOMMITTED_EARLIEST,
-        UNCOMMITTED_LATEST }
-    
-    public static Builder<String, String> builder(String bootstrapServers, 
String ... topics) {
-        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
-    }
-    
-    public static Builder<String, String> builder(String bootstrapServers, 
Collection<String> topics) {
-        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
-    }
-    
-    public static Builder<String, String> builder(String bootstrapServers, 
Pattern topics) {
-        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
-    }
-    
-    private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, 
Object> kafkaProps) {
-        // set defaults for properties not specified
-        if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) 
{
-            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        }
-        return kafkaProps;
+        UNCOMMITTED_LATEST 
     }
     
     public static class Builder<K,V> {
         private final Map<String, Object> kafkaProps;
         private Subscription subscription;
-        private final SerializableDeserializer<K> keyDes;
-        private final Class<? extends Deserializer<K>> keyDesClazz;
-        private final SerializableDeserializer<V> valueDes;
-        private final Class<? extends Deserializer<V>> valueDesClazz;
         private RecordTranslator<K, V> translator;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
-        private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+        private FirstPollOffsetStrategy firstPollOffsetStrategy = 
DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
         private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
         private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
 
         public Builder(String bootstrapServers, String ... topics) {
-            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new NamedSubscription(topics));
-        }
-
-        public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes, SerializableDeserializer<V> valDes, String ... topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
-        }
-        
-        public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes,
-            SerializableDeserializer<V> valDes, Collection<String> topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
-        }
-        
-        public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes,
-            SerializableDeserializer<V> valDes, Pattern topics) {
-            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
-        }
-        
-        public Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes,
-            SerializableDeserializer<V> valDes, Subscription subscription) {
-            this(bootstrapServers, keyDes, null, valDes, null, subscription);
+            this(bootstrapServers, new NamedSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
-                Class<? extends Deserializer<V>> valDes, String ... topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
+        public Builder(String bootstrapServers, Collection<String> topics) {
+            this(bootstrapServers, new NamedSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
-                Class<? extends Deserializer<V>> valDes, Collection<String> 
topics) {
-            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
+        public Builder(String bootstrapServers, Pattern topics) {
+            this(bootstrapServers, new PatternSubscription(topics));
         }
         
-        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
-                Class<? extends Deserializer<V>> valDes, Pattern topics) {
-            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
-        }
-        
-        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
-                Class<? extends Deserializer<V>> valDes, Subscription 
subscription) {
-            this(bootstrapServers, null, keyDes, null, valDes, subscription);
-        }
-        
-        private Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes,
-                Class<? extends Deserializer<K>> keyDesClazz,
-                SerializableDeserializer<V> valDes, Class<? extends 
Deserializer<V>> valDesClazz, Subscription subscription) {
+        /**
+         * Create a KafkaSpoutConfig builder with default property values and 
no key/value deserializers.
+         * @param bootstrapServers The bootstrap servers the consumer will use
+         * @param subscription The subscription defining which topics and 
partitions each spout instance will read.
+         */
+        public Builder(String bootstrapServers, Subscription subscription) {
             kafkaProps = new HashMap<>();
             if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                 throw new IllegalArgumentException("bootstrap servers cannot 
be null");
             }
             kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-            this.keyDes = keyDes;
-            this.keyDesClazz = keyDesClazz;
-            this.valueDes = valDes;
-            this.valueDesClazz = valDesClazz;
             this.subscription = subscription;
-            this.translator = new DefaultRecordTranslator<K,V>();
-        }
-
-        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
-                SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz) {
-            this.kafkaProps = new HashMap<>(builder.kafkaProps);
-            this.subscription = builder.subscription;
-            this.pollTimeoutMs = builder.pollTimeoutMs;
-            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-            //this could result in a lot of class case exceptions at runtime,
-            // but because some translators will work no matter what the 
generics
-            // are I thought it best not to force someone to reset the 
translator
-            // when they change the key/value types.
-            this.translator = (RecordTranslator<K, V>) builder.translator;
-            this.retryService = builder.retryService;
-            this.keyDes = keyDes;
-            this.keyDesClazz = keyDesClazz;
-            this.valueDes = valueDes;
-            this.valueDesClazz = valueDesClazz;
-        }
-
-        /**
-         * Specifying this key deserializer overrides the property 
key.deserializer. If you have
-         * set a custom RecordTranslator before calling this it may result in 
class cast
-         * exceptions at runtime.
-         */
-        public <NewKeyT> Builder<NewKeyT,V> 
setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
-            return new Builder<>(this, keyDeserializer, null, valueDes, 
valueDesClazz);
+            this.translator = new DefaultRecordTranslator<>();
         }
         
         /**
-         * Specify a class that can be instantiated to create a 
key.deserializer
-         * This is the same as setting key.deserializer, but overrides it. If 
you have
-         * set a custom RecordTranslator before calling this it may result in 
class cast
-         * exceptions at runtime.
-         */
-        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends 
Deserializer<NewKeyT>> clazz) {
-            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
-        }
-
-        /**
-         * Specifying this value deserializer overrides the property 
value.deserializer.  If you have
-         * set a custom RecordTranslator before calling this it may result in 
class cast
-         * exceptions at runtime.
-         */
-        public <NewValueT> Builder<K,NewValueT> 
setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
-            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, 
null);
-        }
-        
-        /**
-         * Specify a class that can be instantiated to create a 
value.deserializer
-         * This is the same as setting value.deserializer, but overrides it.  
If you have
-         * set a custom RecordTranslator before calling this it may result in 
class cast
-         * exceptions at runtime.
-         */
-        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends 
Deserializer<NewValueT>> clazz) {
-            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
-        }
-        
-        /**
-         * Set a Kafka property config.
+         * Set a {@link KafkaConsumer} property. 
          */
         public Builder<K,V> setProp(String key, Object value) {
             kafkaProps.put(key, value);
@@ -241,7 +159,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
         }
         
         /**
-         * Set multiple Kafka property configs.
+         * Set multiple {@link KafkaConsumer} properties.
          */
         public Builder<K,V> setProp(Map<String, Object> props) {
             kafkaProps.putAll(props);
@@ -249,84 +167,18 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
         }
         
         /**
-         * Set multiple Kafka property configs.
+         * Set multiple {@link KafkaConsumer} properties.
          */
         public Builder<K,V> setProp(Properties props) {
-            for (String name: props.stringPropertyNames()) {
-                kafkaProps.put(name, props.get(name));
-            }
+            props.forEach((key, value) -> {
+                if (key instanceof String) {
+                    kafkaProps.put((String)key, value);
+                } else {
+                    throw new IllegalArgumentException("Kafka Consumer 
property keys must be Strings");
+                }
+            });
             return this;
         }
-        
-        /**
-         * Set the group.id for the consumers.
-         */
-        public Builder<K,V> setGroupId(String id) {
-            return setProp("group.id", id);
-        }
-        
-        /**
-         * reset the bootstrap servers for the Consumer.
-         */
-        public Builder<K,V> setBootstrapServers(String servers) {
-            return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
-        }
-        
-        /**
-         * The minimum amount of data the broker should return for a fetch 
request.
-         */
-        public Builder<K,V> setFetchMinBytes(int bytes) {
-            return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes);
-        }
-        
-        /**
-         * The maximum amount of data per-partition the broker will return.
-         */
-        public Builder<K,V> setMaxPartitionFectchBytes(int bytes) {
-            return setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
bytes);
-        }
-        
-        /**
-         * The maximum number of records a poll will return.
-         */
-        public Builder<K,V> setMaxPollRecords(int records) {
-            return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
-        }
-        
-        //Security Related Configs
-        
-        /**
-         * Configure the SSL Keystore for mutual authentication.
-         */
-        public Builder<K,V> setSSLKeystore(String location, String password) {
-            return setProp("ssl.keystore.location", location)
-                    .setProp("ssl.keystore.password", password);
-        }
-       
-        /**
-         * Configure the SSL Keystore for mutual authentication.
-         */
-        public Builder<K,V> setSSLKeystore(String location, String password, 
String keyPassword) {
-            return setProp("ssl.key.password", keyPassword)
-                    .setSSLKeystore(location, password);
-        }
-        
-        /**
-         * Configure the SSL Truststore to authenticate with the brokers.
-         */
-        public Builder<K,V> setSSLTruststore(String location, String password) 
{
-            return setSecurityProtocol("SSL")
-                    .setProp("ssl.truststore.location", location)
-                    .setProp("ssl.truststore.password", password);
-        }
-        
-        /**
-         * Protocol used to communicate with brokers. 
-         * Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
-         */
-        public Builder<K, V> setSecurityProtocol(String protocol) {
-            return setProp("security.protocol", protocol);
-        }
 
         //Spout Settings
         /**
@@ -435,82 +287,58 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
             return new KafkaSpoutConfig<>(this);
         }
     }
-
-    // Kafka consumer configuration
-    private final Map<String, Object> kafkaProps;
-    private final Subscription subscription;
-    private final SerializableDeserializer<K> keyDes;
-    private final Class<? extends Deserializer<K>> keyDesClazz;
-    private final SerializableDeserializer<V> valueDes;
-    private final Class<? extends Deserializer<V>> valueDesClazz;
-    private final long pollTimeoutMs;
-
-    // Kafka spout configuration
-    private final RecordTranslator<K, V> translator;
-    private final long offsetCommitPeriodMs;
-    private final int maxUncommittedOffsets;
-    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private final KafkaSpoutRetryService retryService;
-    private final long partitionRefreshPeriodMs;
-    private final boolean emitNullTuples;
-
+    
+        
     /**
-     * Creates a new KafkaSpoutConfig using a Builder.
-     * @param builder The Builder to construct the KafkaSpoutConfig from
+     * Factory method that creates a Builder with String key/value 
deserializers.
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
      */
-    public KafkaSpoutConfig(Builder<K,V> builder) {
-        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-        this.subscription = builder.subscription;
-        this.translator = builder.translator;
-        this.pollTimeoutMs = builder.pollTimeoutMs;
-        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-        this.retryService = builder.retryService;
-        this.keyDes = builder.keyDes;
-        this.keyDesClazz = builder.keyDesClazz;
-        this.valueDes = builder.valueDes;
-        this.valueDesClazz = builder.valueDesClazz;
-        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-        this.emitNullTuples = builder.emitNullTuples;
+    public static Builder<String, String> builder(String bootstrapServers, 
String ... topics) {
+        return setStringDeserializers(new Builder<>(bootstrapServers, topics));
     }
-
+    
     /**
-     * Gets the properties that will be passed to the KafkaConsumer.
-     * @return The Kafka properties map
+     * Factory method that creates a Builder with String key/value 
deserializers.
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
      */
-    public Map<String, Object> getKafkaProps() {
-        return kafkaProps;
+    public static Builder<String, String> builder(String bootstrapServers, 
Collection<String> topics) {
+        return setStringDeserializers(new Builder<>(bootstrapServers, topics));
     }
-
+    
     /**
-     * Gets the Kafka record key deserializer.
-     * @return The key deserializer
+     * Factory method that creates a Builder with String key/value 
deserializers.
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topic pattern to subscribe to
+     * @return The new builder
      */
-    public Deserializer<K> getKeyDeserializer() {
-        if (keyDesClazz != null) {
-            try {
-                return keyDesClazz.newInstance();
-            } catch (InstantiationException | IllegalAccessException e) {
-                throw new RuntimeException("Could not instantiate key 
deserializer " + keyDesClazz);
-            }
+    public static Builder<String, String> builder(String bootstrapServers, 
Pattern topics) {
+        return setStringDeserializers(new Builder<>(bootstrapServers, topics));
+    }
+    
+    private static Builder<String, String> 
setStringDeserializers(Builder<String, String> builder) {
+        builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        return builder;
+    }
+    
+    private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, 
Object> kafkaProps) {
+        // set defaults for properties not specified
+        if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) 
{
+            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         }
-        return keyDes;
+        return kafkaProps;
     }
 
     /**
-     * Gets the Kafka record value deserializer.
-     * @return The value deserializer
+     * Gets the properties that will be passed to the KafkaConsumer.
+     * @return The Kafka properties map
      */
-    public Deserializer<V> getValueDeserializer() {
-        if (valueDesClazz != null) {
-            try {
-                return valueDesClazz.newInstance();
-            } catch (InstantiationException | IllegalAccessException e) {
-                throw new RuntimeException("Could not instantiate value 
deserializer " + valueDesClazz);
-            }
-        }
-        return valueDes;
+    public Map<String, Object> getKafkaProps() {
+        return kafkaProps;
     }
     
     public Subscription getSubscription() {
@@ -562,8 +390,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
     public String toString() {
         return "KafkaSpoutConfig{"
                 + "kafkaProps=" + kafkaProps
-                + ", key=" + getKeyDeserializer()
-                + ", value=" + getValueDeserializer()
                 + ", pollTimeoutMs=" + pollTimeoutMs
                 + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
                 + ", maxUncommittedOffsets=" + maxUncommittedOffsets

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
deleted file mode 100644
index 120260e..0000000
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright 2016 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.io.Serializable;
-import org.apache.kafka.common.serialization.Deserializer;
-
-/**
- * @param <T> The type this deserializer deserializes to.
- */
-public interface SerializableDeserializer<T> extends Deserializer<T>, 
Serializable { 
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
index c283815..9a8142a 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -23,8 +23,7 @@ public class KafkaConsumerFactoryDefault<K, V> implements 
KafkaConsumerFactory<K
 
     @Override
     public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> 
kafkaSpoutConfig) {
-        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
-                kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
+        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
     }
     
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
index cf790b2..30d52cb 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -49,8 +49,7 @@ public class KafkaTridentSpoutManager<K, V> implements 
Serializable {
     }
 
     KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext 
context) {
-        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
-                kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
+        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
 
         kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new 
KafkaSpoutConsumerRebalanceListener(), context);
         return kafkaConsumer;

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index e904e4c..513db90 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.junit.Test;
 
@@ -34,8 +36,10 @@ public class KafkaSpoutConfigTest {
         assertNull(conf.getConsumerGroupId());
         assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator);
         HashMap<String, Object> expected = new HashMap<>();
-        expected.put("bootstrap.servers", "localhost:1234");
-        expected.put("enable.auto.commit", "false");
+        expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:1234");
+        expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         assertEquals(expected, conf.getKafkaProps());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index 0a6a16b..9ebdcf7 100755
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.storm.kafka.KafkaUnitRule;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
@@ -59,7 +60,7 @@ public class MaxUncommittedOffsetTest {
     private final int initialRetryDelaySecs = 60;
     private final KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
         .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-        .setMaxPollRecords(maxPollRecords)
+        .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
         .setMaxUncommittedOffsets(maxUncommittedOffsets)
         .setRetry(new 
KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs),
 KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
             1, 
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) 
//Retry once after a minute

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 48413be..62dbfe5 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.storm.kafka.spout.builders;
 
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.spout.KafkaSpout;
@@ -50,8 +51,8 @@ public class SingleTopicKafkaSpoutConfiguration {
         return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
                 .setRecordTranslator((r) -> new Values(r.topic(), r.key(), 
r.value()),
                         new Fields("topic", "key", "value"), STREAM)
-                .setGroupId("kafkaSpoutTestGroup")
-                .setMaxPollRecords(5)
+                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
                 .setRetry(getRetryService())
                 .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 9459d4b..50991e8 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -23,6 +23,7 @@ import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
@@ -98,7 +99,7 @@ public class KafkaSpoutTopologyMainNamedTopics {
                 (r) -> new Values(r.topic(), r.partition(), r.offset(), 
r.key(), r.value()),
                 new Fields("topic", "partition", "offset", "key", "value"), 
TOPIC_2_STREAM);
         return KafkaSpoutConfig.builder("127.0.0.1:9092", TOPICS)
-                .setGroupId("kafkaSpoutTestGroup")
+                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                 .setRetry(getRetryService())
                 .setRecordTranslator(trans)
                 .setOffsetCommitPeriodMs(10_000)

http://git-wip-us.apache.org/repos/asf/storm/blob/01a56ec9/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
index 5ea9d99..d50e2ab 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -21,6 +21,7 @@ package org.apache.storm.kafka.spout.test;
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
@@ -45,7 +46,7 @@ public class KafkaSpoutTopologyMainWildcardTopics extends 
KafkaSpoutTopologyMain
 
     protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig() {
         return KafkaSpoutConfig.builder("127.0.0.1:9092", 
TOPIC_WILDCARD_PATTERN)
-                .setGroupId("kafkaSpoutTestGroup")
+                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
                 .setRetry(getRetryService())
                 .setRecordTranslator((r) -> new Values(r.topic(), 
r.partition(), r.offset(), r.key(), r.value()),
                         new Fields("topic", "partition", "offset", "key", 
"value"), STREAM)

Reply via email to