Hi Priyank, For question 1 I can think of a couple of reasons (not sure how important they are though): Using FQCN makes it impossible to check the generic type of the deserializer, so you'd be able to pass the wrong type of deserializer to the spout (e.g. a spout that otherwise expects String but is passed an Integer deserializer). Passing class instances instead of using FQCNs makes it possible to set up some configuration for the serializer (e.g. call a constructor with a parameter). I would assume that's why the KafkaConsumer API also supports passing instances instead of FQCNs. We have SerializableDeserializer as a bit of a service to the user. Kafka's Deserializer isn't inherently serializable, and if you configure your spout to use a deserializer that isn't serializable, the topology submission will fail when Nimbus tries to serialize the spout.
For question 2, I think we didn't want to require people to subclass KafkaTuple. If they aren't emitting to multiple streams, it's unnecessary for the tuple to subclass KafkaTuple. I'm almost certain we don't want to change it to List<KafkaTuple>. Splitting a Kafka message into multiple tuples can already be done by adding a splitter bolt that does that after the KafkaSpout in the topology. I don't really see a good reason for putting this functionality in the spout, especially since it complicates ack/commit management a bit more if we have to keep track of multiple tuples per Kafka message. Is there a reason you'd like the message split in the spout, rather than in a downstream bolt? 2017-05-10 2:47 GMT+02:00 Priyank Shah <[email protected]>: > I was going through new kafka spout code and had a couple of questions. > > > 1. https://github.com/apache/storm/blob/master/external/ > storm-kafka-client/src/main/java/org/apache/storm/kafka/ > spout/KafkaSpoutConfig.java#L98 The instance variable at that line and > following 3 lines. Why do we need them? Because of that we have Builder > constructors with different parameters for key and value deserializers. We > even have https://github.com/apache/storm/blob/master/external/ > storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ > SerializableDeserializer.java Not sure if we really need it. > https://github.com/apache/kafka/blob/trunk/clients/src/ > main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L555 > already has a constructor that takes in Properties or a Map and if > key.deserializer and value.deserialzer keys are set to fqcns then it will > instantiate them and take care of them at https://github.com/apache/ > kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ > KafkaConsumer.java#L642 . And we already have setProp method on builder > to set different kafka configs that will be passed to KafkaConsumer > constructor. We can get rid of the SerializableDeserializer interface and a > bunch of constructors and instance variables related to Key and Value > Deserializable. > > 2. We have a RecordTranslator interface that is used to > declareOutputFields at https://github.com/apache/ > storm/blob/master/external/storm-kafka-client/src/main/ > java/org/apache/storm/kafka/spout/KafkaSpout.java#L486 and then we have > this special instanceof check here https://github.com/apache/ > storm/blob/master/external/storm-kafka-client/src/main/ > java/org/apache/storm/kafka/spout/KafkaSpout.java#L333 Why is return type > of apply a List<Object> ? Should we change it to List<KafkaTuple>? That way > we can get rid of instanceof check and support multiple tuples emitted for > one kafka message. > > > Fixes for above two might not be backward compatible but if everyone is > okay with above changes then I can create a patch. >
