Repository: storm
Updated Branches:
  refs/heads/1.x-branch 5890333c7 -> fca692da3


STORM-2756: Make KafkaSpoutConfig.Builder constructors set key/value 
deserializers in kafkaProps, so they are used when building a consumer


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/386ab20c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/386ab20c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/386ab20c

Branch: refs/heads/1.x-branch
Commit: 386ab20c124e41d9b7660d2a9eacd0a3d30a6541
Parents: 892bf5c
Author: Stig Rohde Døssing <[email protected]>
Authored: Sun Sep 24 15:38:19 2017 +0200
Committer: Stig Rohde Døssing <[email protected]>
Committed: Sun Sep 24 15:38:19 2017 +0200

----------------------------------------------------------------------
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 27 ++++++++++++++++++++
 1 file changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/386ab20c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 79e8189..5cad0f4 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -250,6 +250,19 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
             this.valueDesClazz = valDesClazz;
             this.subscription = subscription;
             this.translator = new DefaultRecordTranslator<>();
+            
+            if (keyDesClazz != null) {
+                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
+            }
+            if (keyDes != null) {
+                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDes.getClass());
+            }
+            if (valueDesClazz != null) {
+                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDesClazz);
+            }
+            if (valueDes != null) {
+                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDes.getClass());
+            }
         }
 
         private Builder(Builder<?, ?> builder, SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
@@ -266,6 +279,20 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
             // when they change the key/value types.
             this.translator = (RecordTranslator<K, V>) builder.translator;
             this.retryService = builder.retryService;
+            
+            if (keyDesClazz != null) {
+                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
+            }
+            if (keyDes != null) {
+                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDes.getClass());
+            }
+            if (valueDesClazz != null) {
+                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDesClazz);
+            }
+            if (valueDes != null) {
+                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDes.getClass());
+            }
+            
             this.keyDes = keyDes;
             this.keyDesClazz = keyDesClazz;
             this.valueDes = valueDes;

Reply via email to