Repository: storm Updated Branches: refs/heads/1.x-branch d76370a28 -> 315aa9e63
STORM-2826: Set key/value deserializer fields when using the convenience builder methods in KafkaSpoutConfig Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1811351f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1811351f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1811351f Branch: refs/heads/1.x-branch Commit: 1811351fdf199d5ddfe8037a445715239b2311fd Parents: 2181fcd Author: Stig Rohde Døssing <[email protected]> Authored: Sun Nov 19 16:42:33 2017 +0100 Committer: Stig Rohde Døssing <[email protected]> Committed: Wed Dec 13 16:36:08 2017 +0100 ---------------------------------------------------------------------- .../storm/kafka/spout/KafkaSpoutConfig.java | 92 ++++++++--------- .../storm/kafka/spout/KafkaSpoutConfigTest.java | 102 +++++++++++++++++++ 2 files changed, 148 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1811351f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index d89b674..6d4bd44 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -280,64 +280,64 @@ public class KafkaSpoutConfig<K, V> implements Serializable { private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) { - kafkaProps = new HashMap<>(); + + this(keyDes, keyDesClazz, valDes, valDesClazz, subscription, + new DefaultRecordTranslator<K, V>(), new HashMap<String, Object>()); + if (bootstrapServers == null || bootstrapServers.isEmpty()) { throw new IllegalArgumentException("bootstrap servers cannot be null"); } kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - this.keyDes = keyDes; - this.keyDesClazz = keyDesClazz; - this.valueDes = valDes; - this.valueDesClazz = valDesClazz; - this.subscription = subscription; - this.translator = new DefaultRecordTranslator<>(); - - if (keyDesClazz != null) { - this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); - } - if (keyDes != null) { - this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass()); - } - if (valueDesClazz != null) { - this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz); - } - if (valueDes != null) { - this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass()); - } + + setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz); } - private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, - SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { - this.kafkaProps = new HashMap<>(builder.kafkaProps); - this.subscription = builder.subscription; + /** + * This constructor will always be called by one of the methods {@code setKey} or {@code setVal}, which implies + * that only one of its SerDe parameters will be non null, for which the corresponding Kafka property will be set + */ + @SuppressWarnings("unchecked") + private Builder(final Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { + + this(keyDes, keyDesClazz, valueDes, valueDesClazz, builder.subscription, + (RecordTranslator<K, V>) builder.translator, new HashMap<>(builder.kafkaProps)); + this.pollTimeoutMs = builder.pollTimeoutMs; this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; this.maxUncommittedOffsets = builder.maxUncommittedOffsets; - //this could result in a lot of class case exceptions at runtime, - // but because some translators will work no matter what the generics - // are I thought it best not to force someone to reset the translator - // when they change the key/value types. - this.translator = (RecordTranslator<K, V>) builder.translator; this.retryService = builder.retryService; - + + setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz); + } + + private Builder(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz, + Subscription subscription, RecordTranslator<K, V> translator, Map<String, Object> kafkaProps) { + this.keyDes = keyDes; + this.keyDesClazz = keyDesClazz; + this.valueDes = valueDes; + this.valueDesClazz = valueDesClazz; + this.subscription = subscription; + this.translator = translator; + this.kafkaProps = kafkaProps; + } + + private void setNonNullSerDeKafkaProp(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, + SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { if (keyDesClazz != null) { - this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); + kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); } if (keyDes != null) { - this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass()); + kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass()); } if (valueDesClazz != null) { - this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz); + kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz); } if (valueDes != null) { - this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass()); + kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass()); } - - this.keyDes = keyDes; - this.keyDesClazz = keyDesClazz; - this.valueDes = valueDes; - this.valueDesClazz = valueDesClazz; } /** @@ -348,7 +348,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> keyDeserializer) { - return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz); + return new Builder<>(this, keyDeserializer, null, null, null); } /** @@ -359,7 +359,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) { - return new Builder<>(this, null, clazz, valueDes, valueDesClazz); + return new Builder<>(this, null, clazz, null, null); } /** @@ -370,7 +370,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> valueDeserializer) { - return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null); + return new Builder<>(this, null, null, valueDeserializer, null); } /** @@ -381,7 +381,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { */ @Deprecated public <NV> Builder<K, NV> setValue(Class<? extends Deserializer<NV>> clazz) { - return new Builder<>(this, keyDes, keyDesClazz, null, clazz); + return new Builder<>(this, null, null, null, clazz); } /** @@ -680,7 +680,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * @return The new builder */ public static Builder<String, String> builder(String bootstrapServers, String... topics) { - return setStringDeserializers(new Builder<String, String>(bootstrapServers, topics)); + return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics)); } /** @@ -691,7 +691,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * @return The new builder */ public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) { - return setStringDeserializers(new Builder<String, String>(bootstrapServers, topics)); + return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics)); } /** @@ -702,7 +702,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { * @return The new builder */ public static Builder<String, String> builder(String bootstrapServers, Pattern topics) { - return setStringDeserializers(new Builder<String, String>(bootstrapServers, topics)); + return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics)); } private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) { http://git-wip-us.apache.org/repos/asf/storm/blob/1811351f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java index 2e2d7ff..2ceb7b9 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java @@ -17,6 +17,7 @@ */ package org.apache.storm.kafka.spout; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertEquals; @@ -25,9 +26,12 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import java.util.HashMap; +import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.hamcrest.CoreMatchers; import org.junit.Test; public class KafkaSpoutConfigTest { @@ -121,4 +125,102 @@ public class KafkaSpoutConfigTest { assertThat("When setting enable auto commit explicitly to false the spout should use the 'at-least-once' processing guarantee", conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)); } + + @Test + public void testCanGetKeyDeserializerWhenUsingDefaultBuilder() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .build(); + + assertThat("When using the default builder methods, the key deserializer should default to StringDeserializer", + conf.getKeyDeserializer(), instanceOf(StringDeserializer.class)); + } + + @Test + public void testCanGetValueDeserializerWhenUsingDefaultBuilder() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .build(); + + assertThat("When using the default builder methods, the value deserializer should default to StringDeserializer", + conf.getValueDeserializer(), instanceOf(StringDeserializer.class)); + } + + @Test + public void testCanOverrideDeprecatedDeserializerClassWithKafkaProps() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setKey(StringDeserializer.class) + .setValue(StringDeserializer.class) + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + } + + private static class SerializableStringDeserializer implements SerializableDeserializer { + + private final StringDeserializer delegate = new StringDeserializer(); + + @Override + public void configure(Map configs, boolean isKey) { + delegate.configure(configs, isKey); + } + + @Override + public Object deserialize(String topic, byte[] data) { + return delegate.deserialize(topic, data); + } + + @Override + public void close() { + delegate.close(); + } + } + + @Test + public void testCanOverrideDeprecatedDeserializerInstanceWithKafkaProps() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setKey(new SerializableStringDeserializer()) + .setValue(new SerializableStringDeserializer()) + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + } + + @Test + public void testCanOverrideKafkaPropsWithDeprecatedDeserializerSetter() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setKey(new SerializableStringDeserializer()) + .setValue(new SerializableStringDeserializer()) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class)); + } + + @Test + public void testCanMixOldAndNewDeserializerSetter() { + KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic") + .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setKey(new SerializableStringDeserializer()) + .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class) + .setValue(new SerializableStringDeserializer()) + .build(); + + assertThat("The last set key deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class)); + assertThat("The last set value deserializer should be used, regardless of how it is set", + conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class)); + } }
