Repository: storm
Updated Branches:
  refs/heads/1.x-branch d76370a28 -> 315aa9e63


STORM-2826: Set key/value deserializer fields when using the convenience 
builder methods in KafkaSpoutConfig


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1811351f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1811351f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1811351f

Branch: refs/heads/1.x-branch
Commit: 1811351fdf199d5ddfe8037a445715239b2311fd
Parents: 2181fcd
Author: Stig Rohde Døssing <[email protected]>
Authored: Sun Nov 19 16:42:33 2017 +0100
Committer: Stig Rohde Døssing <[email protected]>
Committed: Wed Dec 13 16:36:08 2017 +0100

----------------------------------------------------------------------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  92 ++++++++---------
 .../storm/kafka/spout/KafkaSpoutConfigTest.java | 102 +++++++++++++++++++
 2 files changed, 148 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1811351f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index d89b674..6d4bd44 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -280,64 +280,64 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
 
         private Builder(String bootstrapServers, SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
             SerializableDeserializer<V> valDes, Class<? extends 
Deserializer<V>> valDesClazz, Subscription subscription) {
-            kafkaProps = new HashMap<>();
+
+            this(keyDes, keyDesClazz, valDes, valDesClazz, subscription,
+                    new DefaultRecordTranslator<K, V>(), new HashMap<String, 
Object>());
+
             if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                 throw new IllegalArgumentException("bootstrap servers cannot 
be null");
             }
             kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
-            this.keyDes = keyDes;
-            this.keyDesClazz = keyDesClazz;
-            this.valueDes = valDes;
-            this.valueDesClazz = valDesClazz;
-            this.subscription = subscription;
-            this.translator = new DefaultRecordTranslator<>();
-            
-            if (keyDesClazz != null) {
-                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
-            }
-            if (keyDes != null) {
-                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDes.getClass());
-            }
-            if (valueDesClazz != null) {
-                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDesClazz);
-            }
-            if (valueDes != null) {
-                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDes.getClass());
-            }
+
+            setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, 
valueDesClazz);
         }
 
-        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
-            SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz) {
-            this.kafkaProps = new HashMap<>(builder.kafkaProps);
-            this.subscription = builder.subscription;
+        /**
+         * This constructor will always be called by one of the methods {@code 
setKey} or {@code setVal}, which implies
+         * that only one of its SerDe parameters will be non null, for which 
the corresponding Kafka property will be set
+         */
+        @SuppressWarnings("unchecked")
+        private Builder(final Builder<?, ?> builder, 
SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> 
keyDesClazz,
+                        SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz) {
+
+            this(keyDes, keyDesClazz, valueDes, valueDesClazz, 
builder.subscription,
+                    (RecordTranslator<K, V>) builder.translator, new 
HashMap<>(builder.kafkaProps));
+
             this.pollTimeoutMs = builder.pollTimeoutMs;
             this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
             this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
             this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-            //this could result in a lot of class case exceptions at runtime,
-            // but because some translators will work no matter what the 
generics
-            // are I thought it best not to force someone to reset the 
translator
-            // when they change the key/value types.
-            this.translator = (RecordTranslator<K, V>) builder.translator;
             this.retryService = builder.retryService;
-            
+
+            setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, 
valueDesClazz);
+        }
+
+        private Builder(SerializableDeserializer<K> keyDes, Class<? extends 
Deserializer<K>> keyDesClazz,
+               SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz,
+               Subscription subscription, RecordTranslator<K, V> translator, 
Map<String, Object> kafkaProps) {
+            this.keyDes = keyDes;
+            this.keyDesClazz = keyDesClazz;
+            this.valueDes = valueDes;
+            this.valueDesClazz = valueDesClazz;
+            this.subscription = subscription;
+            this.translator = translator;
+            this.kafkaProps = kafkaProps;
+        }
+
+        private void setNonNullSerDeKafkaProp(SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz) {
             if (keyDesClazz != null) {
-                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
+                kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDesClazz);
             }
             if (keyDes != null) {
-                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDes.getClass());
+                kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDes.getClass());
             }
             if (valueDesClazz != null) {
-                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDesClazz);
+                kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDesClazz);
             }
             if (valueDes != null) {
-                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDes.getClass());
+                kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDes.getClass());
             }
-            
-            this.keyDes = keyDes;
-            this.keyDesClazz = keyDesClazz;
-            this.valueDes = valueDes;
-            this.valueDesClazz = valueDesClazz;
         }
 
         /**
@@ -348,7 +348,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> 
keyDeserializer) {
-            return new Builder<>(this, keyDeserializer, null, valueDes, 
valueDesClazz);
+            return new Builder<>(this, keyDeserializer, null, null, null);
         }
 
         /**
@@ -359,7 +359,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> 
clazz) {
-            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
+            return new Builder<>(this, null, clazz, null, null);
         }
 
         /**
@@ -370,7 +370,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> 
valueDeserializer) {
-            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, 
null);
+            return new Builder<>(this, null, null, valueDeserializer, null);
         }
 
         /**
@@ -381,7 +381,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
          */
         @Deprecated
         public <NV> Builder<K, NV> setValue(Class<? extends Deserializer<NV>> 
clazz) {
-            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
+            return new Builder<>(this, null, null, null, clazz);
         }
 
         /**
@@ -680,7 +680,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
      * @return The new builder
      */
     public static Builder<String, String> builder(String bootstrapServers, 
String... topics) {
-        return setStringDeserializers(new Builder<String, 
String>(bootstrapServers, topics));
+        return setStringDeserializers(new Builder<String, 
String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, 
topics));
     }
 
     /**
@@ -691,7 +691,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
      * @return The new builder
      */
     public static Builder<String, String> builder(String bootstrapServers, 
Collection<String> topics) {
-        return setStringDeserializers(new Builder<String, 
String>(bootstrapServers, topics));
+        return setStringDeserializers(new Builder<String, 
String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, 
topics));
     }
 
     /**
@@ -702,7 +702,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
      * @return The new builder
      */
     public static Builder<String, String> builder(String bootstrapServers, 
Pattern topics) {
-        return setStringDeserializers(new Builder<String, 
String>(bootstrapServers, topics));
+        return setStringDeserializers(new Builder<String, 
String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, 
topics));
     }
 
     private static Builder<String, String> 
setStringDeserializers(Builder<String, String> builder) {

http://git-wip-us.apache.org/repos/asf/storm/blob/1811351f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 2e2d7ff..2ceb7b9 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka.spout;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertEquals;
@@ -25,9 +26,12 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 
 public class KafkaSpoutConfigTest {
@@ -121,4 +125,102 @@ public class KafkaSpoutConfigTest {
         assertThat("When setting enable auto commit explicitly to false the 
spout should use the 'at-least-once' processing guarantee",
             conf.getProcessingGuarantee(), 
is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
     }
+    
+    @Test
+    public void testCanGetKeyDeserializerWhenUsingDefaultBuilder() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .build();
+        
+        assertThat("When using the default builder methods, the key 
deserializer should default to StringDeserializer",
+            conf.getKeyDeserializer(), instanceOf(StringDeserializer.class));
+    }
+    
+    @Test
+    public void testCanGetValueDeserializerWhenUsingDefaultBuilder() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .build();
+        
+        assertThat("When using the default builder methods, the value 
deserializer should default to StringDeserializer",
+            conf.getValueDeserializer(), instanceOf(StringDeserializer.class));
+    }
+    
+    @Test
+    public void testCanOverrideDeprecatedDeserializerClassWithKafkaProps() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setKey(StringDeserializer.class)
+            .setValue(StringDeserializer.class)
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+    }
+    
+    private static class SerializableStringDeserializer implements 
SerializableDeserializer {
+
+        private final StringDeserializer delegate = new StringDeserializer();
+
+        @Override
+        public void configure(Map configs, boolean isKey) {
+            delegate.configure(configs, isKey);
+        }
+
+        @Override
+        public Object deserialize(String topic, byte[] data) {
+            return delegate.deserialize(topic, data);
+        }
+
+        @Override
+        public void close() {
+            delegate.close();
+        }
+    }
+    
+    @Test
+    public void testCanOverrideDeprecatedDeserializerInstanceWithKafkaProps() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setKey(new SerializableStringDeserializer())
+            .setValue(new SerializableStringDeserializer())
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+    }
+    
+    @Test
+    public void testCanOverrideKafkaPropsWithDeprecatedDeserializerSetter() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .setKey(new SerializableStringDeserializer())
+            .setValue(new SerializableStringDeserializer())
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+    }
+    
+    @Test
+    public void testCanMixOldAndNewDeserializerSetter() {
+        KafkaSpoutConfig<String, String> conf = 
KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .setKey(new SerializableStringDeserializer())
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class)
+            .setValue(new SerializableStringDeserializer())
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless 
of how it is set",
+            
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), 
CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+    }
 }

Reply via email to