Great ideas Jason!

On Fri, Jul 31, 2015 at 12:19 PM, Jay Kreps <j...@confluent.io> wrote:

> I like all these ideas.
>
> Our convention is to keep method names declarative so it should probably be
>   subscribe(List<String> topics, Callback c)
>   assign(List<TopicPartition)
>
> The javadoc would obviously have to clarify the relationship between a
> subscribed topic and assigned partitions. Presumably unsubscribe/unassign
> are unnecessary since this is just a matter of subscribing to the empty
> list.
>
> -Jay
>
> On Fri, Jul 31, 2015 at 11:29 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
>> I was thinking a little bit this morning about the subscription API and I
>> have a few ideas on how to address some of the concerns about intuitiveness
>> and exception handling.
>>
>> 1. Split the current notion of topic/partition subscription into
>> subscription of topics and assignment of partitions. These concepts are
>> pretty fundamentally different and I think at least some of the confusion
>> about when subscriptions() can be used is caused by the fact that we
>> overload the term. If instead that method is renamed to assignment(), then
>> we are communicating to users that it is possible to have a subscription
>> without an active assignment, which is not obvious with the current API.
>> The code in fact already separates these concepts internally, so this would
>> just expose it to the user.
>>
>> 2. Merge rebalance callback into a subscription callback and add method a
>> way to handle errors. The consumer's current rebalance callback is
>> basically invoked when a subscription "succeeds," so it seems a little
>> weird to also provide a callback on subscription. Perhaps we can just take
>> the rebalance callback out of configuration and have the user provide it on
>> subscribe(). We can add a method to the callback to handle errors (e.g. for
>> non-existing topics). Since the callback is provided at subscribe time, it
>> should be clearer to the user that the assignment will not be ready
>> immediately when subscribe returns. It's also arguably a little more
>> natural to set this callback at subscription time rather than when the
>> consumer is constructed.
>>
>> 3. Get rid of the additive subscribe methods and just use setSubscription
>> which would clear the old subscription. After you start providing callbacks
>> to subscribe, then the implementation starts to get tricky if each call to
>> subscribe provides a separate callback. Instead, as Jay suggested, we could
>> just provide a way to set the full list of subscriptions at once, and then
>> there is only one callback to maintain.
>>
>> With these points, the API might look something like this:
>>
>> void setSubscription(List<String> topics, RebalanceCallback callback);
>> void setAssignment(List<TopicPartition> partitions);
>> List<String> subscription();
>> List<TopicPartition> assignment();
>>
>> interface RebalanceCallback {
>>   void onAssignment(List<TopicPartition> partitions);
>>   void onRevocation(List<TopicPartition> partitions);
>>
>>   // handle non-existing topics, etc.
>>   void onError(Exception e);
>> }
>>
>> Any thoughts?
>>
>> -Jason
>>
>>
>>
>> On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <j...@confluent.io> wrote:
>>
>>> Hey Becket,
>>>
>>> Yeah the high-level belief here is that it is possible to give something
>>> as high level as the existing "high level" consumer, but this is not likely
>>> to be the end-all be-all of high-level interfaces for processing streams of
>>> messages. For example neither of these interfaces handles the threading
>>> model for the processing, which obviously is a fairly low-level
>>> implementation detail left to the user in you proposal, the current code,
>>> as well as the existing scala consumer.
>>>
>>> There will be many of these: the full-fledged stream processing
>>> frameworks like Storm/Spark, scalaz streams, the RxJava stuff, a more
>>> traditional message queue like "processor" interface, not to mention the
>>> stuff we're trying to do with KIP-28. For these frameworks it will be quite
>>> weird to add a bunch of new threads since they will want to dictate the
>>> threading model.
>>>
>>> What will be a major failure though is if this client isn't low-level
>>> enough and we need to introduce another layer underneath. This would happen
>>> either because we dictate too much to make it usable for various
>>> applications, frameworks, or use cases. This is the concern with dictating
>>> threading and processing models.
>>>
>>> So to summarize the goal is to subsume the existing APIs, which I think
>>> we all agree this does, and be a foundation on which to build other
>>> abstractions.
>>>
>>> WRT KIP-28, I think it is quite general and if we do that right it will
>>> subsume a lot of the higher level processing and will give a full threaded
>>> processing model to the user.
>>>
>>>
>>> -Jay
>>>
>>>
>>> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>>>
>>>> Thanks for the comments Jason and Jay.
>>>>
>>>> Jason, I had the same concern for producer's callback as well before,
>>>> but it seems to be fine from some callbacks I wrote - user can always pass
>>>> in object in the constructor if necessary for synchronization.
>>>>
>>>> Jay, I agree that the current API might be fine for people who wants to
>>>> wrap it up. But I thought the new consumer was supposed to be a combination
>>>> of old high and low level consumer, which means it should be able to be
>>>> used as is, just like producer. If KafkaConsumer is designed to be wrapped
>>>> up for use, then the question becomes whether Kafka will provide a decent
>>>> wrapper or not? Neha mentioned that KIP-28 will address the users who only
>>>> care about data. Would that be the wrapper provided by Kafka? I am not sure
>>>> if that is sufficient though because the processor is highly abstracted,
>>>> and might only meet the static data stream requirement as I listed in the
>>>> grid. For users who need something from the other grids, are we going to
>>>> have another wrapper? Or are we expecting all the user to write their own
>>>> wrapper for KafkaConsumer? Some other comments are in line.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <j...@confluent.io> wrote:
>>>>
>>>>> Some comments on the proposal:
>>>>>
>>>>> I think we are conflating a number of things that should probably be
>>>>> addressed individually because they are unrelated. My past experience is
>>>>> that this always makes progress hard. The more we can pick apart these
>>>>> items the better:
>>>>>
>>>>>    1. threading model
>>>>>    2. blocking vs non-blocking semantics
>>>>>    3. missing apis
>>>>>    4. missing javadoc and other api surprises
>>>>>    5. Throwing exceptions.
>>>>>
>>>>> The missing APIs are getting added independently. Some like your
>>>>> proposed offsetByTime where things we agreed to hold off on for the first
>>>>> release and do when we'd thought it through. If there are uses for it now
>>>>> we can accelerate. I think each of these is really independent, we know
>>>>> there are things that need to be added but lumping them all into one
>>>>> discussion will be confusing.
>>>>>
>>>>> WRT throwing exceptions the policy is to throw exceptions that are
>>>>> unrecoverable and handle and log other exceptions that are transient. That
>>>>> policy makes sense if you go through the thought exercise of "what will 
>>>>> the
>>>>> user do if i throw this exception to them" if they have no other rational
>>>>> response but to retry (and if failing to anticipate and retry with that
>>>>> exception will kill their program) . You can argue whether the topic not
>>>>> existing is transient or not, unfortunately the way we did auto-creation
>>>>> makes it transient if you are in "auto create mode" and non-transient
>>>>> otherwise (ick!). In any case this is an orthogonal discussion to
>>>>> everything else. I think the policy is right and if we don't conform to it
>>>>> in some way that is really an independent bug/discussion.
>>>>>
>>>> Agreed we can discuss about them separately.
>>>>
>>>>>
>>>>> I suggest we focus on threading and the current event-loop style of
>>>>> api design since I think that is really the crux.
>>>>>
>>>>> The analogy between the producer threading model and the consumer
>>>>> model actually doesn't work for me. The goal of the producer is actually 
>>>>> to
>>>>> take requests from many many user threads and shove them into a single
>>>>> buffer for batching. So the threading model isn't the 1:1 threads you
>>>>> describe it is N:1.The goal of the consumer is to support single-threaded
>>>>> processing. This is what drives the difference. Saying that the producer
>>>>> has N:1 threads therefore for the consumer should have 1:1 threads instead
>>>>> of just 1 thread doesn't make sense any more then an analogy to the 
>>>>> brokers
>>>>> threading model would--the problem we're solving is totally different.
>>>>>
>>>> I think the ultimate goal for producer and consumer are still allowing
>>>> user to send/receive data in parallel. In producer we picked the solution
>>>> of one-producer-serving-multiple-threads, and in consumer we picked
>>>> multiple-single-threaded-consumers instead of
>>>> single-consumer-serving-multiple threads. And we believe people can always
>>>> implement the latter with the former. I think this is a reasonable
>>>> decision. However, there are also reasonable concerns over the
>>>> multiple-single-threaded-consumers solution which is that the single-thread
>>>> might have to be a dedicate polling thread in many cases which pushes user
>>>> towards the other solution - i.e. implementing a
>>>> single-thread-consumer-serving-multiple-threads wrapper. From what we hear,
>>>> it seems to be a quite common concern for most of the users we talked to.
>>>> Plus the adoption bar of the consumer will be much higher because user will
>>>> have to understand some of the details of the things they don't care as
>>>> listed in the grid.
>>>> The analogy between producer/consumer is intended to show that a
>>>> separate polling thread will solve the concerns we have.
>>>>
>>>>>
>>>>>
>>>> I think ultimately though what you need to think about is, does an
>>>>> event loop style of API make sense? That is the source of all the issues
>>>>> you describe. This style of API is incredibly prevalent from unix select 
>>>>> to
>>>>> GUIs to node.js. It's a great way to model multiple channels of messages
>>>>> coming in. It is a fantastic style for event processing. Programmers
>>>>> understand this style of api though I would agree it is unusual compared 
>>>>> to
>>>>> blocking apis. But it is is a single threaded processing model. The 
>>>>> current
>>>>> approach is basically a pure event loop with some convenience methods that
>>>>> are effectively "poll until X is complete".
>>>>>
>>>>> I think basically all the confusion you are describing comes from not
>>>>> documenting/expecting an event loop. The "if you don't call poll nothing
>>>>> happens" point is basically this. It's an event loop. You have to loop. 
>>>>> You
>>>>> can't not call poll. The docs don't cover this right now, perhaps. I think
>>>>> if they do it's not unreasonable behavior.
>>>>>
>>>> I'm not sure if I understand the event-loop correctly and honestly I
>>>> did not think about it clearly before. My understanding is that an
>>>> even-loop model means a single listener thread, but there can be multiple
>>>> event generator threads. The downside is that the listener thread has to be
>>>> fast and very careful about blocking. If we look at the consumer, the
>>>> current model is the caller thread itself act as both event generator and
>>>> listener. As a generator, it generates different task by calling the
>>>> convenience methods. As a listener, it listens to the messages on broker
>>>> and also the tasks generated by itself. So in our proposal, we are not
>>>> changing the event-loop model here just separated the event generator and
>>>> event listener. It looks to me that the underlying execution thread follows
>>>> the event-loop model, the special thing might be it is not only listening
>>>> to the messages from broker, but also listening to the tasks from the user
>>>> thread. This is essentially the thing a consumer has to do - interact with
>>>> both server and user.
>>>>
>>>>>
>>>>> If we want to move away from an event loop I'm not sure *any* aspect
>>>>> of the current event loop style of api makes sense any more. I am not
>>>>> totally married to event loops, but i do think what we have gives an
>>>>> elegant way of implementing any higher level abstractions that would fully
>>>>> implement the user's parallelism model. I don't want to go rethink
>>>>> everything but I do think a half-way implementation that is event loop +
>>>>> background threads is likely going to be icky.
>>>>>
>>>> We brought this up before to change the consumer.poll() to
>>>> consumer.consume(). And did not do so simply because we wanted to less
>>>> change in API... I might be crazy but can we think of the proposed model as
>>>> processing thread + event-loop instead, rather than event-loop + background
>>>> thread?
>>>>
>>>>>
>>>>> WRT making it configurable whether liveness means "actually consuming"
>>>>> or "background thread running" I would suggest that that is really the
>>>>> worst outcome. These type of "modes" that are functionally totally
>>>>> different are just awful from a documentation, testing, usability, etc 
>>>>> pov.
>>>>> I would strongly prefer we pick either of these, document it, and make it
>>>>> work well rather than trying to do both.
>>>>>
>>>> Previously I thought this was the major benefit we wanted from a single
>>>> threaded model, personally I don't have a strong preference on this. So I
>>>> am OK with either way.
>>>>
>>>>>
>>>>> -Jay
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Works now. Thanks Becket!
>>>>>>
>>>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Ah... My bad, forgot to change the URL link for pictures.
>>>>>>> Thanks for the quick response, Neha. It should be fixed now, can you
>>>>>>> try again?
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images
>>>>>>>> that the wiki refers to, but none loaded for me. Just making sure if 
>>>>>>>> its
>>>>>>>> just me or can everyone not see the pictures?
>>>>>>>>
>>>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I agree with Ewen that a single threaded model will be tricky to
>>>>>>>>> implement the same conventional semantic of async or Future. We just
>>>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn on 
>>>>>>>>> the
>>>>>>>>> new consumer API and threading model.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>>>>>>>>
>>>>>>>>> We were trying to see:
>>>>>>>>> 1. If we can use some kind of methodology to help us think about
>>>>>>>>> what API we want to provide to user for different use cases.
>>>>>>>>> 2. What is the pros and cons of current single threaded model. Is
>>>>>>>>> there a way that we can maintain the benefits while solve the issues 
>>>>>>>>> we are
>>>>>>>>> facing now with single threaded model.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>
>>>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>>>>>>>>> e...@confluent.io> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <
>>>>>>>>>> wangg...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along
>>>>>>>>>>> with returning future in the commit calls, i.e. something similar 
>>>>>>>>>>> to:
>>>>>>>>>>>
>>>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>>>>>>>>
>>>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>>>>>>>>> ConsumerCommitCallback callback);
>>>>>>>>>>>
>>>>>>>>>>> At that time I was slightly intending not to include the Future
>>>>>>>>>>> besides adding the callback mainly because of the implementation 
>>>>>>>>>>> complexity
>>>>>>>>>>> I feel it could introduce along with the retry settings after 
>>>>>>>>>>> looking
>>>>>>>>>>> through the code base. I would happy to change my mind if we could 
>>>>>>>>>>> propose
>>>>>>>>>>> a prototype implementation that is simple enough.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> One of the reasons that interface ended up being difficult (or
>>>>>>>>>> maybe impossible) to make work reasonably is because the consumer was
>>>>>>>>>> thread-safe at the time. That made it impossible to know what should 
>>>>>>>>>> be
>>>>>>>>>> done when Future.get() is called -- should the implementation call 
>>>>>>>>>> poll()
>>>>>>>>>> itself, or would the fact that the user is calling get() imply that 
>>>>>>>>>> there's
>>>>>>>>>> a background thread running the poll() loop and we just need to wait 
>>>>>>>>>> for it?
>>>>>>>>>>
>>>>>>>>>> The consumer is no longer thread safe, but I think the same
>>>>>>>>>> problem remains because the expectation with Futures is that they are
>>>>>>>>>> thread safe. Which means that even if the consumer isn't thread 
>>>>>>>>>> safe, I
>>>>>>>>>> would expect to be able to hand that Future off to some other 
>>>>>>>>>> thread, have
>>>>>>>>>> the second thread call get(), and then continue driving the poll 
>>>>>>>>>> loop in my
>>>>>>>>>> thread (which in turn would eventually resolve the Future).
>>>>>>>>>>
>>>>>>>>>> I quite dislike the sync/async enum. While both operations commit
>>>>>>>>>> offsets, their semantics are so different that overloading a single 
>>>>>>>>>> method
>>>>>>>>>> with both is messy. That said, I don't think we should consider this 
>>>>>>>>>> an
>>>>>>>>>> inconsistency wrt the new producer API's use of Future because the 
>>>>>>>>>> two APIs
>>>>>>>>>> have a much more fundamental difference that justifies it: they have
>>>>>>>>>> completely different threading and execution models.
>>>>>>>>>>
>>>>>>>>>> I think a Future-based API only makes sense if you can guarantee
>>>>>>>>>> the operations that Futures are waiting on will continue to make 
>>>>>>>>>> progress
>>>>>>>>>> regardless of what the thread using the Future does. The producer 
>>>>>>>>>> API makes
>>>>>>>>>> that work by processing asynchronous requests in a background 
>>>>>>>>>> thread. The
>>>>>>>>>> new consumer does not, and so it becomes difficult/impossible to 
>>>>>>>>>> implement
>>>>>>>>>> the Future correctly. (Or, you have to make assumptions which break 
>>>>>>>>>> other
>>>>>>>>>> use cases; if you want to support the simple use case of just making 
>>>>>>>>>> a
>>>>>>>>>> commit() synchronous by calling get(), the Future has to call poll()
>>>>>>>>>> internally; but if you do that, then if any user ever wants to add
>>>>>>>>>> synchronization to the consumer via some external mechanism, then the
>>>>>>>>>> implementation of the Future's get() method will not be subject to 
>>>>>>>>>> that
>>>>>>>>>> synchronization and things will break).
>>>>>>>>>>
>>>>>>>>>> -Ewen
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <
>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Adi,
>>>>>>>>>>>>
>>>>>>>>>>>> When we designed the initial version, the producer API was
>>>>>>>>>>>> still changing. I thought about adding the Future and then just 
>>>>>>>>>>>> didn't get
>>>>>>>>>>>> to it. I agree that we should look into adding it for consistency.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Neha
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>>>>>>>>> aaurad...@linkedin.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Great discussion everyone!
>>>>>>>>>>>>>
>>>>>>>>>>>>> One general comment on the sync/async API's on the new
>>>>>>>>>>>>> consumer. I think the producer tackles sync vs async API's
>>>>>>>>>>>>> well. For API's that can either be sync or async, can we simply 
>>>>>>>>>>>>> return a
>>>>>>>>>>>>> future? That seems more elegant for the API's that make sense 
>>>>>>>>>>>>> either in
>>>>>>>>>>>>> both flavors. From the users perspective, it is more consistent 
>>>>>>>>>>>>> with the
>>>>>>>>>>>>> new producer. One easy example is the commit call with the 
>>>>>>>>>>>>> CommitType
>>>>>>>>>>>>> enum.. we can make that call always async and users can block on 
>>>>>>>>>>>>> the future
>>>>>>>>>>>>> if they want to make sure their offsets are committed.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Aditya
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <
>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the great responses, everyone!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring up
>>>>>>>>>>>>>> old high level consumers, the teams we spoke to were actually 
>>>>>>>>>>>>>> not the types
>>>>>>>>>>>>>> of services that simply wanted an easy way to get 
>>>>>>>>>>>>>> ConsumerRecords. We spoke
>>>>>>>>>>>>>> to infrastructure teams that I would consider to be closer to the
>>>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's 
>>>>>>>>>>>>>> level of
>>>>>>>>>>>>>> granularity. Some would use auto group management. Some would 
>>>>>>>>>>>>>> use explicit
>>>>>>>>>>>>>> group management. All of them would turn off auto offset 
>>>>>>>>>>>>>> commits. Yes, the
>>>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, but 
>>>>>>>>>>>>>> this is
>>>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I 
>>>>>>>>>>>>>> don't really
>>>>>>>>>>>>>> think the feedback received was about the simpler times or 
>>>>>>>>>>>>>> wanting
>>>>>>>>>>>>>> additional higher-level clients.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - Onur
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <
>>>>>>>>>>>>>> ja...@confluent.io> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we
>>>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since 
>>>>>>>>>>>>>>> this generally
>>>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's 
>>>>>>>>>>>>>>> currently
>>>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also 
>>>>>>>>>>>>>>> be nice to
>>>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown 
>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the 
>>>>>>>>>>>>>>> timeout to
>>>>>>>>>>>>>>> reassign partitions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Jason
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io
>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hey Kartik,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the
>>>>>>>>>>>>>>>> common case.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>>>>>>>>> 1. Default the timeout high
>>>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When we were doing the consumer design we discussed this
>>>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that 
>>>>>>>>>>>>>>>> defaulting to a
>>>>>>>>>>>>>>>> high timeout was actually better. This means it takes a little 
>>>>>>>>>>>>>>>> longer to
>>>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and 
>>>>>>>>>>>>>>>> people who want
>>>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better 
>>>>>>>>>>>>>>>> than having
>>>>>>>>>>>>>>>> the failure detection not really cover the consumption and 
>>>>>>>>>>>>>>>> just be a
>>>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the 
>>>>>>>>>>>>>>>> GC problem
>>>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some 
>>>>>>>>>>>>>>>> sense a better
>>>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of problems 
>>>>>>>>>>>>>>>> crop up when
>>>>>>>>>>>>>>>> you have an inactive consumer with an active background thread 
>>>>>>>>>>>>>>>> (as today).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When we had the discussion I think what we realized was
>>>>>>>>>>>>>>>> that most people who were worried about the timeout where 
>>>>>>>>>>>>>>>> imagining a very
>>>>>>>>>>>>>>>> low default (500ms) say. But in fact just setting this to 60 
>>>>>>>>>>>>>>>> seconds or
>>>>>>>>>>>>>>>> higher as a default would be okay, this adds to the failure 
>>>>>>>>>>>>>>>> detection time
>>>>>>>>>>>>>>>> but only apps that care about this need to tune. This should 
>>>>>>>>>>>>>>>> largely
>>>>>>>>>>>>>>>> eliminate false positives since after all if you disappear for 
>>>>>>>>>>>>>>>> 60 seconds
>>>>>>>>>>>>>>>> that actually starts to be more of a true positive, even if 
>>>>>>>>>>>>>>>> you come
>>>>>>>>>>>>>>>> back... :-)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> adding the open source alias.  This email started off as a
>>>>>>>>>>>>>>>>> broader discussion around the new consumer.  I was zooming 
>>>>>>>>>>>>>>>>> into only the
>>>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the 
>>>>>>>>>>>>>>>>> heartbeats.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the
>>>>>>>>>>>>>>>>> problem).  Monitoring the lag is important as it is the 
>>>>>>>>>>>>>>>>> primary way to tell
>>>>>>>>>>>>>>>>> if the application is wedged.  There might be other metrics 
>>>>>>>>>>>>>>>>> which can
>>>>>>>>>>>>>>>>> possibly capture the same essence. Yes the lag is at the 
>>>>>>>>>>>>>>>>> consumer group
>>>>>>>>>>>>>>>>> level, but you can tell that one of the consumers is messed 
>>>>>>>>>>>>>>>>> up if one of
>>>>>>>>>>>>>>>>> the partitions in the application start generating lag and 
>>>>>>>>>>>>>>>>> others are good
>>>>>>>>>>>>>>>>> for e.g.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is
>>>>>>>>>>>>>>>>> that in the old consumer most customers don't have to worry 
>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>> unnecessary rebalances and most of the things that they do in 
>>>>>>>>>>>>>>>>> their app
>>>>>>>>>>>>>>>>> doesn't have an impact on the session timeout..  (i.e. the 
>>>>>>>>>>>>>>>>> only thing that
>>>>>>>>>>>>>>>>> causes rebalances is when the GC is out of whack).    For the 
>>>>>>>>>>>>>>>>> handful of
>>>>>>>>>>>>>>>>> customers who are impacted by GC related rebalances, i would 
>>>>>>>>>>>>>>>>> imagine that
>>>>>>>>>>>>>>>>> all of them would really want us to make the system more 
>>>>>>>>>>>>>>>>> resilient.    I
>>>>>>>>>>>>>>>>> agree that the GC problem can't be solved easily in the java 
>>>>>>>>>>>>>>>>> client,
>>>>>>>>>>>>>>>>> however it appears that now we would be expecting the 
>>>>>>>>>>>>>>>>> consuming
>>>>>>>>>>>>>>>>> applications to be even more careful with ongoing tuning of 
>>>>>>>>>>>>>>>>> the timeouts.
>>>>>>>>>>>>>>>>> At LinkedIn, we have seen that most kafka applications don't 
>>>>>>>>>>>>>>>>> have much of a
>>>>>>>>>>>>>>>>> clue about configuring the timeouts and just end up calling 
>>>>>>>>>>>>>>>>> the Kafka team
>>>>>>>>>>>>>>>>> when their application sees rebalances.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is
>>>>>>>>>>>>>>>>> that we have to make sure that people don't set a poll 
>>>>>>>>>>>>>>>>> timeout that is
>>>>>>>>>>>>>>>>> larger than the session timeout.   If we had a notion of 
>>>>>>>>>>>>>>>>> implicit
>>>>>>>>>>>>>>>>> heartbeats then we could also automatically make this work 
>>>>>>>>>>>>>>>>> for consumers by
>>>>>>>>>>>>>>>>> sending hearbeats at the appropriate interval even though the 
>>>>>>>>>>>>>>>>> customers
>>>>>>>>>>>>>>>>> want to do a long poll.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we
>>>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the 
>>>>>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Would love to hear how other people think about this
>>>>>>>>>>>>>>>>> subject ?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <
>>>>>>>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is
>>>>>>>>>>>>>>>>>> that there are many ways the application's message 
>>>>>>>>>>>>>>>>>> processing could fail
>>>>>>>>>>>>>>>>>> and we wouldn't be able to model all of those in the 
>>>>>>>>>>>>>>>>>> consumer's failure
>>>>>>>>>>>>>>>>>> detection mechanism. So we should try to model as much of it 
>>>>>>>>>>>>>>>>>> as we can so
>>>>>>>>>>>>>>>>>> the consumer's failure detection is meaningful.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the
>>>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept 
>>>>>>>>>>>>>>>>>> and the problem
>>>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of 
>>>>>>>>>>>>>>>>>> individual
>>>>>>>>>>>>>>>>>> consumer instances.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The 
>>>>>>>>>>>>>>>>>> dilemma then is who
>>>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but 
>>>>>>>>>>>>>>>>>> the application
>>>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for 
>>>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for 
>>>>>>>>>>>>>>>>>> the user to
>>>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too 
>>>>>>>>>>>>>>>>>> often. The
>>>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be
>>>>>>>>>>>>>>>>>> *exactly* 99tile of processing latency but could be in
>>>>>>>>>>>>>>>>>> the ballpark + an error delta. The error delta is the 
>>>>>>>>>>>>>>>>>> application owner's
>>>>>>>>>>>>>>>>>> acceptable risk threshold during which they would be ok if 
>>>>>>>>>>>>>>>>>> the application
>>>>>>>>>>>>>>>>>> remains part of the group despite being dead. It is 
>>>>>>>>>>>>>>>>>> ultimately a tradeoff
>>>>>>>>>>>>>>>>>> between operational ease and more accurate failure detection.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range from
>>>>>>>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most
>>>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range 
>>>>>>>>>>>>>>>>>> and there are
>>>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC 
>>>>>>>>>>>>>>>>>> falls in the
>>>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be 
>>>>>>>>>>>>>>>>>> predicted, so has to be
>>>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to observe 
>>>>>>>>>>>>>>>>>> what this
>>>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the 
>>>>>>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>>>>>> timeouts.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Since this is not something that can be automated, this
>>>>>>>>>>>>>>>>>> is a config that the application owner has to set based on 
>>>>>>>>>>>>>>>>>> the expected
>>>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads 
>>>>>>>>>>>>>>>>>> to ending up
>>>>>>>>>>>>>>>>>> with bad consumption semantics where the application process 
>>>>>>>>>>>>>>>>>> continues to
>>>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming since 
>>>>>>>>>>>>>>>>>> it has halted
>>>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to 
>>>>>>>>>>>>>>>>>> express that in
>>>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the 
>>>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>>>> has to go through the process of measuring and then defining 
>>>>>>>>>>>>>>>>>> this "max".
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The reverse where they don't do this and the application
>>>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing 
>>>>>>>>>>>>>>>>>> and frustrating
>>>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as 
>>>>>>>>>>>>>>>>>> long as the
>>>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it 
>>>>>>>>>>>>>>>>>> should be easy
>>>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or 
>>>>>>>>>>>>>>>>>> not and how
>>>>>>>>>>>>>>>>>> that should be tuned.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source
>>>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is 
>>>>>>>>>>>>>>>>>> unblocked since you
>>>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points
>>>>>>>>>>>>>>>>>>> you discuss are all very valid.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue.
>>>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry 
>>>>>>>>>>>>>>>>>>> apps that get hit
>>>>>>>>>>>>>>>>>>> by it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are
>>>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also 
>>>>>>>>>>>>>>>>>>> real.  If the app
>>>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is 
>>>>>>>>>>>>>>>>>>> healthy is surely
>>>>>>>>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the 
>>>>>>>>>>>>>>>>>>> app is
>>>>>>>>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>>>>>>>>> In other cases the app might have even died in which
>>>>>>>>>>>>>>>>>>> case this discussion is moot.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to
>>>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time 
>>>>>>>>>>>>>>>>>>> they take to
>>>>>>>>>>>>>>>>>>> process events after every poll.   This turns out is a 
>>>>>>>>>>>>>>>>>>> nontrivial thing to
>>>>>>>>>>>>>>>>>>> do for an application todo. To start with when an 
>>>>>>>>>>>>>>>>>>> application is new their
>>>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on 
>>>>>>>>>>>>>>>>>>> synthetic data.
>>>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in 
>>>>>>>>>>>>>>>>>>> production.  Once
>>>>>>>>>>>>>>>>>>> the app is in production their processing latencies will 
>>>>>>>>>>>>>>>>>>> potentially vary
>>>>>>>>>>>>>>>>>>> over time.  It is extremely unlikely that the application 
>>>>>>>>>>>>>>>>>>> owner does a
>>>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over time 
>>>>>>>>>>>>>>>>>>> and readjust
>>>>>>>>>>>>>>>>>>> the settings.  Often times the latencies vary because of 
>>>>>>>>>>>>>>>>>>> variance is other
>>>>>>>>>>>>>>>>>>> services that are called by the consumer as part of 
>>>>>>>>>>>>>>>>>>> processing the events.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events
>>>>>>>>>>>>>>>>>>> and writes to Kafka.  With quotas the write latencies to 
>>>>>>>>>>>>>>>>>>> kafka could range
>>>>>>>>>>>>>>>>>>> from a few milliseconds all the way to a tens of seconds.  
>>>>>>>>>>>>>>>>>>> As the scale of
>>>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' 
>>>>>>>>>>>>>>>>>>> could now get
>>>>>>>>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the 
>>>>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a 
>>>>>>>>>>>>>>>>>>> potential outage
>>>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us
>>>>>>>>>>>>>>>>>>> to take care of this in the linkedin wrapper.  Whereby we 
>>>>>>>>>>>>>>>>>>> would keep
>>>>>>>>>>>>>>>>>>> calling poll on a separate thread periodically and enqueue 
>>>>>>>>>>>>>>>>>>> the messages.
>>>>>>>>>>>>>>>>>>> When the queue is full we would call pause().
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I
>>>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as 
>>>>>>>>>>>>>>>>>>> every major
>>>>>>>>>>>>>>>>>>> customer will eventually be pained by it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <
>>>>>>>>>>>>>>>>>>> j...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges
>>>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or
>>>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group 
>>>>>>>>>>>>>>>>>>> management stuff
>>>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is 
>>>>>>>>>>>>>>>>>>> already a big
>>>>>>>>>>>>>>>>>>> effort.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of
>>>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when 
>>>>>>>>>>>>>>>>>>> people are surprised
>>>>>>>>>>>>>>>>>>> because we haven't labeled something that will be 
>>>>>>>>>>>>>>>>>>> unintuitive. But the
>>>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in 
>>>>>>>>>>>>>>>>>>> programming
>>>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond 
>>>>>>>>>>>>>>>>>>> people if explained
>>>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if 
>>>>>>>>>>>>>>>>>>> we don't say
>>>>>>>>>>>>>>>>>>> which is which any scheme will be confusing).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has
>>>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people had 
>>>>>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking 
>>>>>>>>>>>>>>>>>>> iterator,
>>>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, 
>>>>>>>>>>>>>>>>>>> etc  have all
>>>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that 
>>>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people 
>>>>>>>>>>>>>>>>>>> want to try to
>>>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've 
>>>>>>>>>>>>>>>>>>> heard not to use
>>>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to 
>>>>>>>>>>>>>>>>>>> lack of features).
>>>>>>>>>>>>>>>>>>> It's important to have that context when people need to 
>>>>>>>>>>>>>>>>>>> switch and they say
>>>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Let me give some context related to your points, based
>>>>>>>>>>>>>>>>>>> on our previous discussions:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was
>>>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the 
>>>>>>>>>>>>>>>>>>> lowest level
>>>>>>>>>>>>>>>>>>> client. There are many, many possible higher level 
>>>>>>>>>>>>>>>>>>> processing abstractions.
>>>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level 
>>>>>>>>>>>>>>>>>>> client was that
>>>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with 
>>>>>>>>>>>>>>>>>>> things that are
>>>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading 
>>>>>>>>>>>>>>>>>>> model. If you do
>>>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level 
>>>>>>>>>>>>>>>>>>> consumer api for
>>>>>>>>>>>>>>>>>>> people to get around whatever you have done.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The second reason was that the internal threading in the
>>>>>>>>>>>>>>>>>>> client became quite complex. The answer with threading is 
>>>>>>>>>>>>>>>>>>> always that "it
>>>>>>>>>>>>>>>>>>> won't be complex this time", but it always is.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to
>>>>>>>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the 
>>>>>>>>>>>>>>>>>>> application must
>>>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. This 
>>>>>>>>>>>>>>>>>>> allows the
>>>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However 
>>>>>>>>>>>>>>>>>>> it's important to
>>>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do 
>>>>>>>>>>>>>>>>>>> background polling a
>>>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. 
>>>>>>>>>>>>>>>>>>> This leads to
>>>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because 
>>>>>>>>>>>>>>>>>>> they have
>>>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming 
>>>>>>>>>>>>>>>>>>> partitions and
>>>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. 
>>>>>>>>>>>>>>>>>>> If you allow
>>>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and 
>>>>>>>>>>>>>>>>>>> knows they aren't
>>>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if 
>>>>>>>>>>>>>>>>>>> you allows false
>>>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone 
>>>>>>>>>>>>>>>>>>> notices that a
>>>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which point 
>>>>>>>>>>>>>>>>>>> the data is
>>>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have 
>>>>>>>>>>>>>>>>>>> regular false
>>>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this 
>>>>>>>>>>>>>>>>>>> in some depth
>>>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the 
>>>>>>>>>>>>>>>>>>> liveness notion tied
>>>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition
>>>>>>>>>>>>>>>>>>> of liveness.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus,
>>>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll drives everything. The other methods 
>>>>>>>>>>>>>>>>>>>> like subscribe,
>>>>>>>>>>>>>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything 
>>>>>>>>>>>>>>>>>>>> without a
>>>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    The semantics of a commit() call should be
>>>>>>>>>>>>>>>>>>>>    consistent between sync and async operations. 
>>>>>>>>>>>>>>>>>>>> Currently, sync commit is a
>>>>>>>>>>>>>>>>>>>>    blocking call which actually sends out an 
>>>>>>>>>>>>>>>>>>>> OffsetCommitRequest and waits for
>>>>>>>>>>>>>>>>>>>>    the response upon the user’s KafkaConsumer.commit call. 
>>>>>>>>>>>>>>>>>>>> However, the async
>>>>>>>>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the 
>>>>>>>>>>>>>>>>>>>> OffsetCommitRequest.
>>>>>>>>>>>>>>>>>>>>    The request itself is later sent out in the next poll. 
>>>>>>>>>>>>>>>>>>>> The teams we talked
>>>>>>>>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    Heartbeats are dependent on user application
>>>>>>>>>>>>>>>>>>>>    behavior (i.e. user applications calling poll). This 
>>>>>>>>>>>>>>>>>>>> can be a big problem
>>>>>>>>>>>>>>>>>>>>    as we don’t control how different applications behave. 
>>>>>>>>>>>>>>>>>>>> For example, we
>>>>>>>>>>>>>>>>>>>>    might have an application which reads from Kafka and 
>>>>>>>>>>>>>>>>>>>> writes to Espresso. If
>>>>>>>>>>>>>>>>>>>>    Espresso is slow for whatever reason, then in 
>>>>>>>>>>>>>>>>>>>> rebalances could happen.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current
>>>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the 
>>>>>>>>>>>>>>>>>>>> old simple
>>>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with 
>>>>>>>>>>>>>>>>>>>> raw protocols and
>>>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for 
>>>>>>>>>>>>>>>>>>>> users. However, for
>>>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of 
>>>>>>>>>>>>>>>>>>>> users), the
>>>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level 
>>>>>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while 
>>>>>>>>>>>>>>>>>>>> KafkaConsumer
>>>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and 
>>>>>>>>>>>>>>>>>>>> is becoming
>>>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the 
>>>>>>>>>>>>>>>>>>>> javadoc growing
>>>>>>>>>>>>>>>>>>>> bigger and bigger.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should take
>>>>>>>>>>>>>>>>>>>> a step back and look at the big picture.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called 
>>>>>>>>>>>>>>>>>>>> by the user
>>>>>>>>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the
>>>>>>>>>>>>>>>>>>>> end user, this ends up making us educate users on NIO and 
>>>>>>>>>>>>>>>>>>>> the consequences
>>>>>>>>>>>>>>>>>>>> of not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with
>>>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of
>>>>>>>>>>>>>>>>>>>> KafkaConsumer:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume
>>>>>>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management)
>>>>>>>>>>>>>>>>>>>> or partitions(explicit group management)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't
>>>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that there 
>>>>>>>>>>>>>>>>>>>> should be no
>>>>>>>>>>>>>>>>>>>> requirement from the end user to consistently 
>>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll in order
>>>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be 
>>>>>>>>>>>>>>>>>>>> better to split
>>>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on 
>>>>>>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks
>>>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the
>>>>>>>>>>>>>>>>>>>> end user.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to
>>>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you at 
>>>>>>>>>>>>>>>>>>>> LinkedIn on *July
>>>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it
>>>>>>>>>>>>>>>>>>>> to open source.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Neha
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Thanks,
>>>>>>>>>> Ewen
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks,
>>>>>>>> Neha
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Neha
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to