I want to add a few things other than the issues raised by Sriharsha and Hugo. 
I am pasting one of the other emails that I sent sometime back about cleaning 
up KafkaSpoutConfig. Stig responsed to that email. Trying to answer that in 
this email so that it is all in one place. Answers in line.



Hi Priyank,

For question 1 I can think of a couple of reasons (not sure how important
they are though): Using FQCN makes it impossible to check the generic type
of the deserializer, so you'd be able to pass the wrong type of
deserializer to the spout (e.g. a spout that otherwise expects String but
is passed an Integer deserializer). Passing class instances instead of
using FQCNs makes it possible to set up some configuration for the
serializer (e.g. call a constructor with a parameter). I would assume
that's why the KafkaConsumer API also supports passing instances instead of
FQCNs. We have SerializableDeserializer as a bit of a service to the user.
Kafka's Deserializer isn't inherently serializable, and if you configure
your spout to use a deserializer that isn't serializable, the topology
submission will fail when Nimbus tries to serialize the spout.

Not sure where and how do we check the type of Deserializer to make sure that 
it’s a String and not Integer by error. Can you elaborate on that? It will be 
anyway throwing a RuntimeException in the worker. Developers are already aware 
that message needs to be deserialized with correct deserializer. And if that’s 
not the case then it should be fixed. As far as configuration state for 
deserializer object is concerned, api already has configure method here 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L37
 Do you see any strong reason why one would use an instance of Deserializer 
with some constructor args and setter methods to create the state and not just 
use the string value for FQCN. The state can be passed through consumer 
properties in configure method as mentioned before. Getting rid of 4 instance 
variables keyDes, keyDesClass, valueDes, valueDesClazz and the related 
interface SerializableDeserializer and the different combinations of overloaded 
constructors because of that will be a big cleanup. Instead of doing a service 
to the user we are making the interface of KafkaSpoutConfig and hence 
KafkaSpout more complicated. On the same lines, I still don’t see a reason for 
creating a deserializer object on client side and serialize it to send it over 
to Nimbus. Let me know if you have a concrete example of where one would need 
this.

For question 2, I think we didn't want to require people to subclass
KafkaTuple. If they aren't emitting to multiple streams, it's unnecessary
for the tuple to subclass KafkaTuple. I'm almost certain we don't want to
change it to List<KafkaTuple>. Splitting a Kafka message into multiple
tuples can already be done by adding a splitter bolt that does that after
the KafkaSpout in the topology. I don't really see a good reason for
putting this functionality in the spout, especially since it complicates
ack/commit management a bit more if we have to keep track of multiple
tuples per Kafka message. Is there a reason you'd like the message split in
the spout, rather than in a downstream bolt?

I agree we don’t want return type to be List<KafkaTuple> because of offset 
management you mentioned. I think making it return KafkaTuple still makes 
sense. I would rather prefer forcing user to subclass KafkaTuple. Any strong 
reasons for not requiring users to subclass it? The way it is currently, it can 
misguide the user. List<Object> can be anything, including List<KafkaTuple>. I 
think having the instanceof check in KafkaSpout for object returned in apply 
method is unnecessary. We should change the apply method of the interface to 
return KafkaTuple and have DefaultRecordTranslator handle that i.e. return a 
KafkaTuple with getStream returning the default stream. The only contract user 
needs to bind to is that the stream in KafkaTuple has to be one of the streams 
returned by RecordTranslator that is used in declareOutputFields.

Talking about manual assignment, I remember already doing something like that 
in KinesisSpout. If everyone is okay with the above changes I want to take up 
the task of making the changes as discussed in this email thread(whatever 
conclusion we reach) and even switch the spout to manual assignments. I know 
it’s going to be backward incompatible but I prefer that since we will be 
cleaning up lot of stuff. We can decide which release to pick if at all we vote 
for these backward incompatible changes. 

2017-05-10 2:47 GMT+02:00 Priyank Shah <ps...@hortonworks.com>:

I was going through new kafka spout code and had a couple of questions.


1.       https://github.com/apache/storm/blob/master/external/
storm-kafka-client/src/main/java/org/apache/storm/kafka/
spout/KafkaSpoutConfig.java#L98 The instance variable at that line and
following 3 lines. Why do we need them? Because of that we have Builder
constructors with different parameters for key and value deserializers. We
even have https://github.com/apache/storm/blob/master/external/
storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/
SerializableDeserializer.java Not sure if we really need it.
https://github.com/apache/kafka/blob/trunk/clients/src/
main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L555
already has a constructor that takes in Properties or a Map and if
key.deserializer and value.deserialzer keys are set to fqcns then it will
instantiate them and take care of them at https://github.com/apache/
kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/
KafkaConsumer.java#L642 . And we already have setProp method on builder
to set different kafka configs that will be passed to KafkaConsumer
constructor. We can get rid of the SerializableDeserializer interface and a
bunch of constructors and instance variables related to Key and Value
Deserializable.

2.       We have a RecordTranslator interface that is used to
declareOutputFields at https://github.com/apache/
storm/blob/master/external/storm-kafka-client/src/main/
java/org/apache/storm/kafka/spout/KafkaSpout.java#L486 and then we have
this special instanceof check here https://github.com/apache/
storm/blob/master/external/storm-kafka-client/src/main/
java/org/apache/storm/kafka/spout/KafkaSpout.java#L333 Why is return type
of apply a List<Object> ? Should we change it to List<KafkaTuple>? That way
we can get rid of instanceof check and support multiple tuples emitted for
one kafka message.


Fixes for above two might not be backward compatible but if everyone is
okay with above changes then I can create a patch.


On 6/9/17, 4:09 PM, "Hugo Da Cruz Louro" <hlo...@hortonworks.com> wrote:

    +1 for simplifying KafkaSpoutConfig. Too many constructors and too many 
methods.. I am not sure it’s justifiable to have any methods that simply set 
KafkaConsumer properties. All of these properties should just go in a 
Map<String, Object>, which is what KafkaConsumer receives, and what was 
supported in the initial implementation. The names of the properties can be 
retrieved from org.apache.kafka.clients.consumer.ConsumerConfig. At this point 
we may have to keep in mind backwards compatibility.
    
    Not sure we should completely discontinue dynamic partition assignment, as 
it is one of primary features of the new Storm Kafka Client API. With this 
said, manual partition assignment should be supported and would solve a lot of 
potential problems arising from dynamic partition assignment.
    
    Hugo
    
    > On Jun 9, 2017, at 3:33 PM, Harsha <st...@harsha.io> wrote:
    > 
    > I think question why we need all those settings when a user can pass it
    > via Properties with consumer properties defined or via Map conf object.
    > Having the methods on top of consumer config means every time Kafka
    > consumer property added or changed one needs add a builder method.  We
    > need to get out of the way and let the user configure it like they do it
    > for typical Kafka Consumer instead we've 10s of methods that sets
    > properties for ConsumerConfig. 
    > 
    > Examples:
    > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L317
    > 
    > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L309
    > etc.. all of these are specific to KafkaConsumer  config, users should
    > be able to pass it via Properties all of these.
    > 
    > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L327
    > 
    > whats the benefit of adding that method? and we are forcing that to set
    > the protocol to "SSL" in this method
    > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L318
    > 
    > Users can set the ssl properties and then can select the
    > securityProtocol "SASL_SSL" which requires both kerberos and ssl configs
    > to be set. In above case making a call setSSLTruststore changes the
    > security.protocol to "SSL". This could easily run into issues if the
    > users sets securityProtocol first with "SASL_SSL" then later calls
    > setSSLTruststore which changes it to "SSL".
    > 
    > We are over-taking these settings instead of letting user to figure out
    > from Kafka consumer config page.
    > 
    > In contrast we've KafkaProducer which does this
    > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java#L121
    > . I would add Properties object instead of deriving it from topologyConf
    > but this is much more easier to understand for the users. The contract
    > here is put whatever the producer configs that users wants in the conf
    > object and we create producer out of that config. 
    > 
    > Honestly these interfaces needs to be simple and let the user have
    > control instead of adding our interpretation. 
    > 
    > 
    > 
    > Thanks,
    > Harsha
    > On Jun 9, 2017, 2:08 PM -0700, Stig Døssing <generalbas....@gmail.com>,
    > wrote:
    > I'd be happy with a simpler KafkaSpoutConfig, but I think most of the
    > configuration parameters have good reasons for being there. Any examples
    > of
    > parameters you think we should remove?
    > 
    > 2017-06-09 21:34 GMT+02:00 Harsha <st...@harsha.io>:
    > 
    > +1 on using the manual assignment for the reasons specified below. We
    > will see duplicates even in stable conditions which
    > is not good. I don’t see any reason not to switch to manual assignment.
    > While we are at it we should refactor the KafkaConfig part.
    > It should be as simple as accepting the kafka consumer config or
    > properties file and forwarding it to KafkaConsumer. We made
    > it overly complex and unnecessary.
    > 
    > Thanks,
    > Harsha
    > 
    
    

Reply via email to