Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2428#discussion_r153035015
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -320,20 +320,17 @@ private Builder(Builder<?, ?> builder, 
SerializableDeserializer<K> keyDes, Class
                 // when they change the key/value types.
                 this.translator = (RecordTranslator<K, V>) builder.translator;
                 this.retryService = builder.retryService;
    -            
    -            if (keyDesClazz != null) {
    -                
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
    -            }
    +
                 if (keyDes != null) {
                     
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDes.getClass());
    -            }
    -            if (valueDesClazz != null) {
    -                
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDesClazz);
    -            }
    -            if (valueDes != null) {
    +            } else if (keyDesClazz != null) {
    --- End diff --
    
    One of the changes in https://github.com/apache/storm/pull/2215 is to make 
users set most Kafka properties through setProp instead of duplicating 
configuration parameters in KafkaSpoutConfig. These properties include key and 
value deserializers.
    
    In order to provide backward compatibility that PR tries to ensure that if 
the keyDes/keyDesClazz field is set, then the corresponding kafkaProps property 
is also set. The spout doesn't use the fields anymore, it only reads from the 
kafkaProps map. The setKey/setValue methods were also deprecated, and users 
were directed to use setProp instead. Additionally the convenience builder 
methods were changed so they didn't set the keyDes/keyDesClazz fields, but just 
set properties in kafkaProps instead. 
    
    The issue Alexandre hit during testing was that he was doing something like 
`kafkaSpoutConfig.getKeyDeserializer().getClass()` on a KafkaSpoutConfig built 
from one of the convenience builders. Since keyDes/keyDesClazz aren't being set 
by the default builders anymore, this causes an NPE.
    
    I think we still want to be able to provide the simplified KafkaSpoutConfig 
API to 1.x, and we still want to encourage users to use setProp instead of 
setKey/setValue. In order to fix the NPE we also have to set keyDes/keyDesClazz 
in the convenience builder methods again.
    
    Without this change the code behaves in a very confusing way when mixing 
use of setKey/setValue and setProp. The added unit tests demonstrate cases 
where the current code would act counterintuitively. Try pasting the current 
code into the modified Builder constructor, and you'll see test failures.
    
    The modified constructor is used when setKey/setValue is called. With the 
current code, calling e.g. setKey will actually overwrite both the key and 
value properties in kafkaProps if keyDesClazz and valueDesClazz were set in the 
old Builder. 
    
    For example, if you did 
`KafkaSpoutConfig.builder(topic).setProp(MyValueDeserializer.class).setKey(MyKeyDeserializer.class)`,
 you end up with a KafkaSpoutConfig where the value deserializer is 
`StringDeserializer`. This is because the `builder` method now sets both 
keyDesClazz and valueDesClazz by default. When setKey calls the copy 
constructor, it overwrites both the key and value deserializer properties in 
kafkaProps with the values of the key/valueDesClazz fields. The updated code 
makes sure that if you call setKey, then only the key deserializer property 
will be set. 
    
    The change isn't necessary for the constructor further up, because it isn't 
a copy constructor. The problem with the copy constructor is that it takes in a 
kafkaProps that already might contain settings for key/value deserializers and 
overwrites them. The unmodified constructor has an empty kafkaProps, so there's 
no issue.


---

Reply via email to