I want to add a few things other than the issues raised by Sriharsha and Hugo. I am pasting one of the other emails that I sent sometime back about cleaning up KafkaSpoutConfig. Stig responsed to that email. Trying to answer that in this email so that it is all in one place. Answers in line.
Hi Priyank, For question 1 I can think of a couple of reasons (not sure how important they are though): Using FQCN makes it impossible to check the generic type of the deserializer, so you'd be able to pass the wrong type of deserializer to the spout (e.g. a spout that otherwise expects String but is passed an Integer deserializer). Passing class instances instead of using FQCNs makes it possible to set up some configuration for the serializer (e.g. call a constructor with a parameter). I would assume that's why the KafkaConsumer API also supports passing instances instead of FQCNs. We have SerializableDeserializer as a bit of a service to the user. Kafka's Deserializer isn't inherently serializable, and if you configure your spout to use a deserializer that isn't serializable, the topology submission will fail when Nimbus tries to serialize the spout. Not sure where and how do we check the type of Deserializer to make sure that it’s a String and not Integer by error. Can you elaborate on that? It will be anyway throwing a RuntimeException in the worker. Developers are already aware that message needs to be deserialized with correct deserializer. And if that’s not the case then it should be fixed. As far as configuration state for deserializer object is concerned, api already has configure method here https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L37 Do you see any strong reason why one would use an instance of Deserializer with some constructor args and setter methods to create the state and not just use the string value for FQCN. The state can be passed through consumer properties in configure method as mentioned before. Getting rid of 4 instance variables keyDes, keyDesClass, valueDes, valueDesClazz and the related interface SerializableDeserializer and the different combinations of overloaded constructors because of that will be a big cleanup. Instead of doing a service to the user we are making the interface of KafkaSpoutConfig and hence KafkaSpout more complicated. On the same lines, I still don’t see a reason for creating a deserializer object on client side and serialize it to send it over to Nimbus. Let me know if you have a concrete example of where one would need this. For question 2, I think we didn't want to require people to subclass KafkaTuple. If they aren't emitting to multiple streams, it's unnecessary for the tuple to subclass KafkaTuple. I'm almost certain we don't want to change it to List<KafkaTuple>. Splitting a Kafka message into multiple tuples can already be done by adding a splitter bolt that does that after the KafkaSpout in the topology. I don't really see a good reason for putting this functionality in the spout, especially since it complicates ack/commit management a bit more if we have to keep track of multiple tuples per Kafka message. Is there a reason you'd like the message split in the spout, rather than in a downstream bolt? I agree we don’t want return type to be List<KafkaTuple> because of offset management you mentioned. I think making it return KafkaTuple still makes sense. I would rather prefer forcing user to subclass KafkaTuple. Any strong reasons for not requiring users to subclass it? The way it is currently, it can misguide the user. List<Object> can be anything, including List<KafkaTuple>. I think having the instanceof check in KafkaSpout for object returned in apply method is unnecessary. We should change the apply method of the interface to return KafkaTuple and have DefaultRecordTranslator handle that i.e. return a KafkaTuple with getStream returning the default stream. The only contract user needs to bind to is that the stream in KafkaTuple has to be one of the streams returned by RecordTranslator that is used in declareOutputFields. Talking about manual assignment, I remember already doing something like that in KinesisSpout. If everyone is okay with the above changes I want to take up the task of making the changes as discussed in this email thread(whatever conclusion we reach) and even switch the spout to manual assignments. I know it’s going to be backward incompatible but I prefer that since we will be cleaning up lot of stuff. We can decide which release to pick if at all we vote for these backward incompatible changes. 2017-05-10 2:47 GMT+02:00 Priyank Shah <ps...@hortonworks.com>: I was going through new kafka spout code and had a couple of questions. 1. https://github.com/apache/storm/blob/master/external/ storm-kafka-client/src/main/java/org/apache/storm/kafka/ spout/KafkaSpoutConfig.java#L98 The instance variable at that line and following 3 lines. Why do we need them? Because of that we have Builder constructors with different parameters for key and value deserializers. We even have https://github.com/apache/storm/blob/master/external/ storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ SerializableDeserializer.java Not sure if we really need it. https://github.com/apache/kafka/blob/trunk/clients/src/ main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L555 already has a constructor that takes in Properties or a Map and if key.deserializer and value.deserialzer keys are set to fqcns then it will instantiate them and take care of them at https://github.com/apache/ kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ KafkaConsumer.java#L642 . And we already have setProp method on builder to set different kafka configs that will be passed to KafkaConsumer constructor. We can get rid of the SerializableDeserializer interface and a bunch of constructors and instance variables related to Key and Value Deserializable. 2. We have a RecordTranslator interface that is used to declareOutputFields at https://github.com/apache/ storm/blob/master/external/storm-kafka-client/src/main/ java/org/apache/storm/kafka/spout/KafkaSpout.java#L486 and then we have this special instanceof check here https://github.com/apache/ storm/blob/master/external/storm-kafka-client/src/main/ java/org/apache/storm/kafka/spout/KafkaSpout.java#L333 Why is return type of apply a List<Object> ? Should we change it to List<KafkaTuple>? That way we can get rid of instanceof check and support multiple tuples emitted for one kafka message. Fixes for above two might not be backward compatible but if everyone is okay with above changes then I can create a patch. On 6/9/17, 4:09 PM, "Hugo Da Cruz Louro" <hlo...@hortonworks.com> wrote: +1 for simplifying KafkaSpoutConfig. Too many constructors and too many methods.. I am not sure it’s justifiable to have any methods that simply set KafkaConsumer properties. All of these properties should just go in a Map<String, Object>, which is what KafkaConsumer receives, and what was supported in the initial implementation. The names of the properties can be retrieved from org.apache.kafka.clients.consumer.ConsumerConfig. At this point we may have to keep in mind backwards compatibility. Not sure we should completely discontinue dynamic partition assignment, as it is one of primary features of the new Storm Kafka Client API. With this said, manual partition assignment should be supported and would solve a lot of potential problems arising from dynamic partition assignment. Hugo > On Jun 9, 2017, at 3:33 PM, Harsha <st...@harsha.io> wrote: > > I think question why we need all those settings when a user can pass it > via Properties with consumer properties defined or via Map conf object. > Having the methods on top of consumer config means every time Kafka > consumer property added or changed one needs add a builder method. We > need to get out of the way and let the user configure it like they do it > for typical Kafka Consumer instead we've 10s of methods that sets > properties for ConsumerConfig. > > Examples: > https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L317 > > https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L309 > etc.. all of these are specific to KafkaConsumer config, users should > be able to pass it via Properties all of these. > > https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L327 > > whats the benefit of adding that method? and we are forcing that to set > the protocol to "SSL" in this method > https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L318 > > Users can set the ssl properties and then can select the > securityProtocol "SASL_SSL" which requires both kerberos and ssl configs > to be set. In above case making a call setSSLTruststore changes the > security.protocol to "SSL". This could easily run into issues if the > users sets securityProtocol first with "SASL_SSL" then later calls > setSSLTruststore which changes it to "SSL". > > We are over-taking these settings instead of letting user to figure out > from Kafka consumer config page. > > In contrast we've KafkaProducer which does this > https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java#L121 > . I would add Properties object instead of deriving it from topologyConf > but this is much more easier to understand for the users. The contract > here is put whatever the producer configs that users wants in the conf > object and we create producer out of that config. > > Honestly these interfaces needs to be simple and let the user have > control instead of adding our interpretation. > > > > Thanks, > Harsha > On Jun 9, 2017, 2:08 PM -0700, Stig Døssing <generalbas....@gmail.com>, > wrote: > I'd be happy with a simpler KafkaSpoutConfig, but I think most of the > configuration parameters have good reasons for being there. Any examples > of > parameters you think we should remove? > > 2017-06-09 21:34 GMT+02:00 Harsha <st...@harsha.io>: > > +1 on using the manual assignment for the reasons specified below. We > will see duplicates even in stable conditions which > is not good. I don’t see any reason not to switch to manual assignment. > While we are at it we should refactor the KafkaConfig part. > It should be as simple as accepting the kafka consumer config or > properties file and forwarding it to KafkaConsumer. We made > it overly complex and unnecessary. > > Thanks, > Harsha >