Repository: storm Updated Branches: refs/heads/1.x-branch 5890333c7 -> fca692da3
STORM-2756: Make KafkaSpoutConfig.Builder constructors set key/value deserializers in kafkaProps, so they are used when building a consumer Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/386ab20c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/386ab20c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/386ab20c Branch: refs/heads/1.x-branch Commit: 386ab20c124e41d9b7660d2a9eacd0a3d30a6541 Parents: 892bf5c Author: Stig Rohde Døssing <[email protected]> Authored: Sun Sep 24 15:38:19 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Sun Sep 24 15:38:19 2017 +0200 ---------------------------------------------------------------------- .../storm/kafka/spout/KafkaSpoutConfig.java | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/386ab20c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 79e8189..5cad0f4 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -250,6 +250,19 @@ public class KafkaSpoutConfig<K, V> implements Serializable { this.valueDesClazz = valDesClazz; this.subscription = subscription; this.translator = new DefaultRecordTranslator<>(); + + if (keyDesClazz != null) { + this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); + } + if (keyDes != null) { + this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass()); + } + if (valueDesClazz != null) { + this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz); + } + if (valueDes != null) { + this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass()); + } } private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, @@ -266,6 +279,20 @@ public class KafkaSpoutConfig<K, V> implements Serializable { // when they change the key/value types. this.translator = (RecordTranslator<K, V>) builder.translator; this.retryService = builder.retryService; + + if (keyDesClazz != null) { + this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz); + } + if (keyDes != null) { + this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass()); + } + if (valueDesClazz != null) { + this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz); + } + if (valueDes != null) { + this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass()); + } + this.keyDes = keyDes; this.keyDesClazz = keyDesClazz; this.valueDes = valueDes;
