I was thinking a little bit this morning about the subscription API and I
have a few ideas on how to address some of the concerns about intuitiveness
and exception handling.

1. Split the current notion of topic/partition subscription into
subscription of topics and assignment of partitions. These concepts are
pretty fundamentally different and I think at least some of the confusion
about when subscriptions() can be used is caused by the fact that we
overload the term. If instead that method is renamed to assignment(), then
we are communicating to users that it is possible to have a subscription
without an active assignment, which is not obvious with the current API.
The code in fact already separates these concepts internally, so this would
just expose it to the user.

2. Merge rebalance callback into a subscription callback and add method a
way to handle errors. The consumer's current rebalance callback is
basically invoked when a subscription "succeeds," so it seems a little
weird to also provide a callback on subscription. Perhaps we can just take
the rebalance callback out of configuration and have the user provide it on
subscribe(). We can add a method to the callback to handle errors (e.g. for
non-existing topics). Since the callback is provided at subscribe time, it
should be clearer to the user that the assignment will not be ready
immediately when subscribe returns. It's also arguably a little more
natural to set this callback at subscription time rather than when the
consumer is constructed.

3. Get rid of the additive subscribe methods and just use setSubscription
which would clear the old subscription. After you start providing callbacks
to subscribe, then the implementation starts to get tricky if each call to
subscribe provides a separate callback. Instead, as Jay suggested, we could
just provide a way to set the full list of subscriptions at once, and then
there is only one callback to maintain.

With these points, the API might look something like this:

void setSubscription(List<String> topics, RebalanceCallback callback);
void setAssignment(List<TopicPartition> partitions);
List<String> subscription();
List<TopicPartition> assignment();

interface RebalanceCallback {
  void onAssignment(List<TopicPartition> partitions);
  void onRevocation(List<TopicPartition> partitions);

  // handle non-existing topics, etc.
  void onError(Exception e);
}

Any thoughts?

-Jason



On Thu, Jul 30, 2015 at 11:59 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Becket,
>
> Yeah the high-level belief here is that it is possible to give something
> as high level as the existing "high level" consumer, but this is not likely
> to be the end-all be-all of high-level interfaces for processing streams of
> messages. For example neither of these interfaces handles the threading
> model for the processing, which obviously is a fairly low-level
> implementation detail left to the user in you proposal, the current code,
> as well as the existing scala consumer.
>
> There will be many of these: the full-fledged stream processing frameworks
> like Storm/Spark, scalaz streams, the RxJava stuff, a more traditional
> message queue like "processor" interface, not to mention the stuff we're
> trying to do with KIP-28. For these frameworks it will be quite weird to
> add a bunch of new threads since they will want to dictate the threading
> model.
>
> What will be a major failure though is if this client isn't low-level
> enough and we need to introduce another layer underneath. This would happen
> either because we dictate too much to make it usable for various
> applications, frameworks, or use cases. This is the concern with dictating
> threading and processing models.
>
> So to summarize the goal is to subsume the existing APIs, which I think we
> all agree this does, and be a foundation on which to build other
> abstractions.
>
> WRT KIP-28, I think it is quite general and if we do that right it will
> subsume a lot of the higher level processing and will give a full threaded
> processing model to the user.
>
>
> -Jay
>
>
> On Wed, Jul 29, 2015 at 6:25 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>
>> Thanks for the comments Jason and Jay.
>>
>> Jason, I had the same concern for producer's callback as well before, but
>> it seems to be fine from some callbacks I wrote - user can always pass in
>> object in the constructor if necessary for synchronization.
>>
>> Jay, I agree that the current API might be fine for people who wants to
>> wrap it up. But I thought the new consumer was supposed to be a combination
>> of old high and low level consumer, which means it should be able to be
>> used as is, just like producer. If KafkaConsumer is designed to be wrapped
>> up for use, then the question becomes whether Kafka will provide a decent
>> wrapper or not? Neha mentioned that KIP-28 will address the users who only
>> care about data. Would that be the wrapper provided by Kafka? I am not sure
>> if that is sufficient though because the processor is highly abstracted,
>> and might only meet the static data stream requirement as I listed in the
>> grid. For users who need something from the other grids, are we going to
>> have another wrapper? Or are we expecting all the user to write their own
>> wrapper for KafkaConsumer? Some other comments are in line.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Jul 29, 2015 at 3:16 PM, Jay Kreps <j...@confluent.io> wrote:
>>
>>> Some comments on the proposal:
>>>
>>> I think we are conflating a number of things that should probably be
>>> addressed individually because they are unrelated. My past experience is
>>> that this always makes progress hard. The more we can pick apart these
>>> items the better:
>>>
>>>    1. threading model
>>>    2. blocking vs non-blocking semantics
>>>    3. missing apis
>>>    4. missing javadoc and other api surprises
>>>    5. Throwing exceptions.
>>>
>>> The missing APIs are getting added independently. Some like your
>>> proposed offsetByTime where things we agreed to hold off on for the first
>>> release and do when we'd thought it through. If there are uses for it now
>>> we can accelerate. I think each of these is really independent, we know
>>> there are things that need to be added but lumping them all into one
>>> discussion will be confusing.
>>>
>>> WRT throwing exceptions the policy is to throw exceptions that are
>>> unrecoverable and handle and log other exceptions that are transient. That
>>> policy makes sense if you go through the thought exercise of "what will the
>>> user do if i throw this exception to them" if they have no other rational
>>> response but to retry (and if failing to anticipate and retry with that
>>> exception will kill their program) . You can argue whether the topic not
>>> existing is transient or not, unfortunately the way we did auto-creation
>>> makes it transient if you are in "auto create mode" and non-transient
>>> otherwise (ick!). In any case this is an orthogonal discussion to
>>> everything else. I think the policy is right and if we don't conform to it
>>> in some way that is really an independent bug/discussion.
>>>
>> Agreed we can discuss about them separately.
>>
>>>
>>> I suggest we focus on threading and the current event-loop style of api
>>> design since I think that is really the crux.
>>>
>>> The analogy between the producer threading model and the consumer model
>>> actually doesn't work for me. The goal of the producer is actually to take
>>> requests from many many user threads and shove them into a single buffer
>>> for batching. So the threading model isn't the 1:1 threads you describe it
>>> is N:1.The goal of the consumer is to support single-threaded processing.
>>> This is what drives the difference. Saying that the producer has N:1
>>> threads therefore for the consumer should have 1:1 threads instead of just
>>> 1 thread doesn't make sense any more then an analogy to the brokers
>>> threading model would--the problem we're solving is totally different.
>>>
>> I think the ultimate goal for producer and consumer are still allowing
>> user to send/receive data in parallel. In producer we picked the solution
>> of one-producer-serving-multiple-threads, and in consumer we picked
>> multiple-single-threaded-consumers instead of
>> single-consumer-serving-multiple threads. And we believe people can always
>> implement the latter with the former. I think this is a reasonable
>> decision. However, there are also reasonable concerns over the
>> multiple-single-threaded-consumers solution which is that the single-thread
>> might have to be a dedicate polling thread in many cases which pushes user
>> towards the other solution - i.e. implementing a
>> single-thread-consumer-serving-multiple-threads wrapper. From what we hear,
>> it seems to be a quite common concern for most of the users we talked to.
>> Plus the adoption bar of the consumer will be much higher because user will
>> have to understand some of the details of the things they don't care as
>> listed in the grid.
>> The analogy between producer/consumer is intended to show that a separate
>> polling thread will solve the concerns we have.
>>
>>>
>>>
>> I think ultimately though what you need to think about is, does an event
>>> loop style of API make sense? That is the source of all the issues you
>>> describe. This style of API is incredibly prevalent from unix select to
>>> GUIs to node.js. It's a great way to model multiple channels of messages
>>> coming in. It is a fantastic style for event processing. Programmers
>>> understand this style of api though I would agree it is unusual compared to
>>> blocking apis. But it is is a single threaded processing model. The current
>>> approach is basically a pure event loop with some convenience methods that
>>> are effectively "poll until X is complete".
>>>
>>> I think basically all the confusion you are describing comes from not
>>> documenting/expecting an event loop. The "if you don't call poll nothing
>>> happens" point is basically this. It's an event loop. You have to loop. You
>>> can't not call poll. The docs don't cover this right now, perhaps. I think
>>> if they do it's not unreasonable behavior.
>>>
>> I'm not sure if I understand the event-loop correctly and honestly I did
>> not think about it clearly before. My understanding is that an even-loop
>> model means a single listener thread, but there can be multiple event
>> generator threads. The downside is that the listener thread has to be fast
>> and very careful about blocking. If we look at the consumer, the current
>> model is the caller thread itself act as both event generator and listener.
>> As a generator, it generates different task by calling the convenience
>> methods. As a listener, it listens to the messages on broker and also the
>> tasks generated by itself. So in our proposal, we are not changing the
>> event-loop model here just separated the event generator and event
>> listener. It looks to me that the underlying execution thread follows the
>> event-loop model, the special thing might be it is not only listening to
>> the messages from broker, but also listening to the tasks from the user
>> thread. This is essentially the thing a consumer has to do - interact with
>> both server and user.
>>
>>>
>>> If we want to move away from an event loop I'm not sure *any* aspect of
>>> the current event loop style of api makes sense any more. I am not totally
>>> married to event loops, but i do think what we have gives an elegant way of
>>> implementing any higher level abstractions that would fully implement the
>>> user's parallelism model. I don't want to go rethink everything but I do
>>> think a half-way implementation that is event loop + background threads is
>>> likely going to be icky.
>>>
>> We brought this up before to change the consumer.poll() to
>> consumer.consume(). And did not do so simply because we wanted to less
>> change in API... I might be crazy but can we think of the proposed model as
>> processing thread + event-loop instead, rather than event-loop + background
>> thread?
>>
>>>
>>> WRT making it configurable whether liveness means "actually consuming"
>>> or "background thread running" I would suggest that that is really the
>>> worst outcome. These type of "modes" that are functionally totally
>>> different are just awful from a documentation, testing, usability, etc pov.
>>> I would strongly prefer we pick either of these, document it, and make it
>>> work well rather than trying to do both.
>>>
>> Previously I thought this was the major benefit we wanted from a single
>> threaded model, personally I don't have a strong preference on this. So I
>> am OK with either way.
>>
>>>
>>> -Jay
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io>
>>> wrote:
>>>
>>>> Works now. Thanks Becket!
>>>>
>>>> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com>
>>>> wrote:
>>>>
>>>>> Ah... My bad, forgot to change the URL link for pictures.
>>>>> Thanks for the quick response, Neha. It should be fixed now, can you
>>>>> try again?
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Thanks Becket. Quick comment - there seem to be a bunch of images
>>>>>> that the wiki refers to, but none loaded for me. Just making sure if its
>>>>>> just me or can everyone not see the pictures?
>>>>>>
>>>>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I agree with Ewen that a single threaded model will be tricky to
>>>>>>> implement the same conventional semantic of async or Future. We just
>>>>>>> drafted the following wiki which explains our thoughts in LinkedIn on 
>>>>>>> the
>>>>>>> new consumer API and threading model.
>>>>>>>
>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>>>>>>
>>>>>>> We were trying to see:
>>>>>>> 1. If we can use some kind of methodology to help us think about
>>>>>>> what API we want to provide to user for different use cases.
>>>>>>> 2. What is the pros and cons of current single threaded model. Is
>>>>>>> there a way that we can maintain the benefits while solve the issues we 
>>>>>>> are
>>>>>>> facing now with single threaded model.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>>>>>>> e...@confluent.io> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think Ewen has proposed these APIs for using callbacks along
>>>>>>>>> with returning future in the commit calls, i.e. something similar to:
>>>>>>>>>
>>>>>>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>>>>>>
>>>>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>>>>>>> ConsumerCommitCallback callback);
>>>>>>>>>
>>>>>>>>> At that time I was slightly intending not to include the Future
>>>>>>>>> besides adding the callback mainly because of the implementation 
>>>>>>>>> complexity
>>>>>>>>> I feel it could introduce along with the retry settings after looking
>>>>>>>>> through the code base. I would happy to change my mind if we could 
>>>>>>>>> propose
>>>>>>>>> a prototype implementation that is simple enough.
>>>>>>>>>
>>>>>>>>>
>>>>>>>> One of the reasons that interface ended up being difficult (or
>>>>>>>> maybe impossible) to make work reasonably is because the consumer was
>>>>>>>> thread-safe at the time. That made it impossible to know what should be
>>>>>>>> done when Future.get() is called -- should the implementation call 
>>>>>>>> poll()
>>>>>>>> itself, or would the fact that the user is calling get() imply that 
>>>>>>>> there's
>>>>>>>> a background thread running the poll() loop and we just need to wait 
>>>>>>>> for it?
>>>>>>>>
>>>>>>>> The consumer is no longer thread safe, but I think the same problem
>>>>>>>> remains because the expectation with Futures is that they are thread 
>>>>>>>> safe.
>>>>>>>> Which means that even if the consumer isn't thread safe, I would 
>>>>>>>> expect to
>>>>>>>> be able to hand that Future off to some other thread, have the second
>>>>>>>> thread call get(), and then continue driving the poll loop in my thread
>>>>>>>> (which in turn would eventually resolve the Future).
>>>>>>>>
>>>>>>>> I quite dislike the sync/async enum. While both operations commit
>>>>>>>> offsets, their semantics are so different that overloading a single 
>>>>>>>> method
>>>>>>>> with both is messy. That said, I don't think we should consider this an
>>>>>>>> inconsistency wrt the new producer API's use of Future because the two 
>>>>>>>> APIs
>>>>>>>> have a much more fundamental difference that justifies it: they have
>>>>>>>> completely different threading and execution models.
>>>>>>>>
>>>>>>>> I think a Future-based API only makes sense if you can guarantee
>>>>>>>> the operations that Futures are waiting on will continue to make 
>>>>>>>> progress
>>>>>>>> regardless of what the thread using the Future does. The producer API 
>>>>>>>> makes
>>>>>>>> that work by processing asynchronous requests in a background thread. 
>>>>>>>> The
>>>>>>>> new consumer does not, and so it becomes difficult/impossible to 
>>>>>>>> implement
>>>>>>>> the Future correctly. (Or, you have to make assumptions which break 
>>>>>>>> other
>>>>>>>> use cases; if you want to support the simple use case of just making a
>>>>>>>> commit() synchronous by calling get(), the Future has to call poll()
>>>>>>>> internally; but if you do that, then if any user ever wants to add
>>>>>>>> synchronization to the consumer via some external mechanism, then the
>>>>>>>> implementation of the Future's get() method will not be subject to that
>>>>>>>> synchronization and things will break).
>>>>>>>>
>>>>>>>> -Ewen
>>>>>>>>
>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Adi,
>>>>>>>>>>
>>>>>>>>>> When we designed the initial version, the producer API was still
>>>>>>>>>> changing. I thought about adding the Future and then just didn't get 
>>>>>>>>>> to it.
>>>>>>>>>> I agree that we should look into adding it for consistency.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Neha
>>>>>>>>>>
>>>>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>>>>>>> aaurad...@linkedin.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Great discussion everyone!
>>>>>>>>>>>
>>>>>>>>>>> One general comment on the sync/async API's on the new consumer.
>>>>>>>>>>>  I think the producer tackles sync vs async API's well. For
>>>>>>>>>>> API's that can either be sync or async, can we simply return a 
>>>>>>>>>>> future? That
>>>>>>>>>>> seems more elegant for the API's that make sense either in both 
>>>>>>>>>>> flavors.
>>>>>>>>>>> From the users perspective, it is more consistent with the new 
>>>>>>>>>>> producer.
>>>>>>>>>>> One easy example is the commit call with the CommitType enum.. we 
>>>>>>>>>>> can make
>>>>>>>>>>> that call always async and users can block on the future if they 
>>>>>>>>>>> want to
>>>>>>>>>>> make sure their offsets are committed.
>>>>>>>>>>>
>>>>>>>>>>> Aditya
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <
>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the great responses, everyone!
>>>>>>>>>>>>
>>>>>>>>>>>> To expand a tiny bit on my initial post: while I did bring up
>>>>>>>>>>>> old high level consumers, the teams we spoke to were actually not 
>>>>>>>>>>>> the types
>>>>>>>>>>>> of services that simply wanted an easy way to get ConsumerRecords. 
>>>>>>>>>>>> We spoke
>>>>>>>>>>>> to infrastructure teams that I would consider to be closer to the
>>>>>>>>>>>> "power-user" end of the spectrum and would want KafkaConsumer's 
>>>>>>>>>>>> level of
>>>>>>>>>>>> granularity. Some would use auto group management. Some would use 
>>>>>>>>>>>> explicit
>>>>>>>>>>>> group management. All of them would turn off auto offset commits. 
>>>>>>>>>>>> Yes, the
>>>>>>>>>>>> Samza team had prior experience with the old SimpleConsumer, but 
>>>>>>>>>>>> this is
>>>>>>>>>>>> the first kafka consumer being used by the Databus team. So I 
>>>>>>>>>>>> don't really
>>>>>>>>>>>> think the feedback received was about the simpler times or wanting
>>>>>>>>>>>> additional higher-level clients.
>>>>>>>>>>>>
>>>>>>>>>>>> - Onur
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <
>>>>>>>>>>>> ja...@confluent.io> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I think if we recommend a longer session timeout, then we
>>>>>>>>>>>>> should expose the heartbeat frequency in configuration since this 
>>>>>>>>>>>>> generally
>>>>>>>>>>>>> controls how long normal rebalances will take. I think it's 
>>>>>>>>>>>>> currently
>>>>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be 
>>>>>>>>>>>>> nice to
>>>>>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown 
>>>>>>>>>>>>> of a
>>>>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the 
>>>>>>>>>>>>> timeout to
>>>>>>>>>>>>> reassign partitions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Jason
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey Kartik,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the
>>>>>>>>>>>>>> common case.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>>>>>>> 1. Default the timeout high
>>>>>>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When we were doing the consumer design we discussed this
>>>>>>>>>>>>>> tradeoff and I think the conclusion we came to was that 
>>>>>>>>>>>>>> defaulting to a
>>>>>>>>>>>>>> high timeout was actually better. This means it takes a little 
>>>>>>>>>>>>>> longer to
>>>>>>>>>>>>>> detect a failure, but usually that is not a big problem and 
>>>>>>>>>>>>>> people who want
>>>>>>>>>>>>>> faster failure detection can tune it down. This seemed better 
>>>>>>>>>>>>>> than having
>>>>>>>>>>>>>> the failure detection not really cover the consumption and just 
>>>>>>>>>>>>>> be a
>>>>>>>>>>>>>> background ping. The two reasons where (a) you still have the GC 
>>>>>>>>>>>>>> problem
>>>>>>>>>>>>>> even for the background thread, (b) consumption is in some sense 
>>>>>>>>>>>>>> a better
>>>>>>>>>>>>>> definition of an active healthy consumer and a lot of problems 
>>>>>>>>>>>>>> crop up when
>>>>>>>>>>>>>> you have an inactive consumer with an active background thread 
>>>>>>>>>>>>>> (as today).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When we had the discussion I think what we realized was that
>>>>>>>>>>>>>> most people who were worried about the timeout where imagining a 
>>>>>>>>>>>>>> very low
>>>>>>>>>>>>>> default (500ms) say. But in fact just setting this to 60 seconds 
>>>>>>>>>>>>>> or higher
>>>>>>>>>>>>>> as a default would be okay, this adds to the failure detection 
>>>>>>>>>>>>>> time but
>>>>>>>>>>>>>> only apps that care about this need to tune. This should largely 
>>>>>>>>>>>>>> eliminate
>>>>>>>>>>>>>> false positives since after all if you disappear for 60 seconds 
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>> actually starts to be more of a true positive, even if you come 
>>>>>>>>>>>>>> back... :-)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> adding the open source alias.  This email started off as a
>>>>>>>>>>>>>>> broader discussion around the new consumer.  I was zooming into 
>>>>>>>>>>>>>>> only the
>>>>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the 
>>>>>>>>>>>>>>> heartbeats.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes the lag is the effect of the problem (not the problem).
>>>>>>>>>>>>>>> Monitoring the lag is important as it is the primary way to 
>>>>>>>>>>>>>>> tell if the
>>>>>>>>>>>>>>> application is wedged.  There might be other metrics which can 
>>>>>>>>>>>>>>> possibly
>>>>>>>>>>>>>>> capture the same essence. Yes the lag is at the consumer group 
>>>>>>>>>>>>>>> level, but
>>>>>>>>>>>>>>> you can tell that one of the consumers is messed up if one of 
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> partitions in the application start generating lag and others 
>>>>>>>>>>>>>>> are good for
>>>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Monitoring aside, I think the main point of concern is that
>>>>>>>>>>>>>>> in the old consumer most customers don't have to worry about 
>>>>>>>>>>>>>>> unnecessary
>>>>>>>>>>>>>>> rebalances and most of the things that they do in their app 
>>>>>>>>>>>>>>> doesn't have an
>>>>>>>>>>>>>>> impact on the session timeout..  (i.e. the only thing that 
>>>>>>>>>>>>>>> causes
>>>>>>>>>>>>>>> rebalances is when the GC is out of whack).    For the handful 
>>>>>>>>>>>>>>> of customers
>>>>>>>>>>>>>>> who are impacted by GC related rebalances, i would imagine that 
>>>>>>>>>>>>>>> all of them
>>>>>>>>>>>>>>> would really want us to make the system more resilient.    I 
>>>>>>>>>>>>>>> agree that the
>>>>>>>>>>>>>>> GC problem can't be solved easily in the java client, however 
>>>>>>>>>>>>>>> it appears
>>>>>>>>>>>>>>> that now we would be expecting the consuming applications to be 
>>>>>>>>>>>>>>> even more
>>>>>>>>>>>>>>> careful with ongoing tuning of the timeouts.  At LinkedIn, we 
>>>>>>>>>>>>>>> have seen
>>>>>>>>>>>>>>> that most kafka applications don't have much of a clue about 
>>>>>>>>>>>>>>> configuring
>>>>>>>>>>>>>>> the timeouts and just end up calling the Kafka team when their 
>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>> sees rebalances.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The other side effect of poll driving the heartbeats is that
>>>>>>>>>>>>>>> we have to make sure that people don't set a poll timeout that 
>>>>>>>>>>>>>>> is larger
>>>>>>>>>>>>>>> than the session timeout.   If we had a notion of implicit 
>>>>>>>>>>>>>>> heartbeats then
>>>>>>>>>>>>>>> we could also automatically make this work for consumers by 
>>>>>>>>>>>>>>> sending
>>>>>>>>>>>>>>> hearbeats at the appropriate interval even though the customers 
>>>>>>>>>>>>>>> want to do
>>>>>>>>>>>>>>> a long poll.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We could surely work around this in LinkedIn if either we
>>>>>>>>>>>>>>> have the Pause() api or an explicit HeartBeat() api on the 
>>>>>>>>>>>>>>> consumer.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Would love to hear how other people think about this subject
>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <
>>>>>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is that
>>>>>>>>>>>>>>>> there are many ways the application's message processing could 
>>>>>>>>>>>>>>>> fail and we
>>>>>>>>>>>>>>>> wouldn't be able to model all of those in the consumer's 
>>>>>>>>>>>>>>>> failure detection
>>>>>>>>>>>>>>>> mechanism. So we should try to model as much of it as we can 
>>>>>>>>>>>>>>>> so the
>>>>>>>>>>>>>>>> consumer's failure detection is meaningful.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The lag is merely the effect of the problem, not the
>>>>>>>>>>>>>>>> problem itself. Lag is also a consumer group level concept and 
>>>>>>>>>>>>>>>> the problem
>>>>>>>>>>>>>>>> we have is being able to detect failures at the level of 
>>>>>>>>>>>>>>>> individual
>>>>>>>>>>>>>>>> consumer instances.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma 
>>>>>>>>>>>>>>>> then is who
>>>>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but 
>>>>>>>>>>>>>>>> the application
>>>>>>>>>>>>>>>> owner can define what a "normal" processing latency is for 
>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>> application. Now the question is what's the easiest way for 
>>>>>>>>>>>>>>>> the user to
>>>>>>>>>>>>>>>> define this without having to tune and fine tune this too 
>>>>>>>>>>>>>>>> often. The
>>>>>>>>>>>>>>>> heartbeat interval certainly does not have to be *exactly*
>>>>>>>>>>>>>>>> 99tile of processing latency but could be in the ballpark + an 
>>>>>>>>>>>>>>>> error delta.
>>>>>>>>>>>>>>>> The error delta is the application owner's acceptable risk 
>>>>>>>>>>>>>>>> threshold during
>>>>>>>>>>>>>>>> which they would be ok if the application remains part of the 
>>>>>>>>>>>>>>>> group despite
>>>>>>>>>>>>>>>> being dead. It is ultimately a tradeoff between operational 
>>>>>>>>>>>>>>>> ease and more
>>>>>>>>>>>>>>>> accurate failure detection.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> With quotas the write latencies to kafka could range from a
>>>>>>>>>>>>>>>>> few milliseconds all the way to a tens of seconds.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is actually no different from the GC problem. Most
>>>>>>>>>>>>>>>> most of the times, the normal GC falls in the few ms range and 
>>>>>>>>>>>>>>>> there are
>>>>>>>>>>>>>>>> many applications even at LinkedIn for which the max GC falls 
>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>> multiple seconds range. Note that it also can't be predicted, 
>>>>>>>>>>>>>>>> so has to be
>>>>>>>>>>>>>>>> an observed value. One way or the other, you have to observe 
>>>>>>>>>>>>>>>> what this
>>>>>>>>>>>>>>>> acceptable "max" is for your application and then set the 
>>>>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>>>>> timeouts.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Since this is not something that can be automated, this is
>>>>>>>>>>>>>>>> a config that the application owner has to set based on the 
>>>>>>>>>>>>>>>> expected
>>>>>>>>>>>>>>>> behavior of their application. Not wanting to do that leads to 
>>>>>>>>>>>>>>>> ending up
>>>>>>>>>>>>>>>> with bad consumption semantics where the application process 
>>>>>>>>>>>>>>>> continues to
>>>>>>>>>>>>>>>> be part of a group owning partitions but not consuming since 
>>>>>>>>>>>>>>>> it has halted
>>>>>>>>>>>>>>>> due to a problem. The fact that the design requires them to 
>>>>>>>>>>>>>>>> express that in
>>>>>>>>>>>>>>>> poll() frequency or not doesn't change the fact that the 
>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>> has to go through the process of measuring and then defining 
>>>>>>>>>>>>>>>> this "max".
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The reverse where they don't do this and the application
>>>>>>>>>>>>>>>> remains in the group despite being dead is super confusing and 
>>>>>>>>>>>>>>>> frustrating
>>>>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as 
>>>>>>>>>>>>>>>> long as the
>>>>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it 
>>>>>>>>>>>>>>>> should be easy
>>>>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or 
>>>>>>>>>>>>>>>> not and how
>>>>>>>>>>>>>>>> that should be tuned.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source
>>>>>>>>>>>>>>>> that will hide this complexity and I agree that LI is 
>>>>>>>>>>>>>>>> unblocked since you
>>>>>>>>>>>>>>>> can do this in TrackerConsumer in the meantime.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points
>>>>>>>>>>>>>>>>> you discuss are all very valid.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue.
>>>>>>>>>>>>>>>>> However there are a smaller percentage of memory hungry apps 
>>>>>>>>>>>>>>>>> that get hit
>>>>>>>>>>>>>>>>> by it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are
>>>>>>>>>>>>>>>>> healthy, the app might not be behaving correctly is also 
>>>>>>>>>>>>>>>>> real.  If the app
>>>>>>>>>>>>>>>>> is calling poll() then the probability that the app is 
>>>>>>>>>>>>>>>>> healthy is surely
>>>>>>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the 
>>>>>>>>>>>>>>>>> app is
>>>>>>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>>>>>>> In other cases the app might have even died in which case
>>>>>>>>>>>>>>>>> this discussion is moot.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Point being that the only absolute way to really detect
>>>>>>>>>>>>>>>>> that an app is healthy is to monitor lag. If the lag 
>>>>>>>>>>>>>>>>> increases then for
>>>>>>>>>>>>>>>>> sure something is wrong.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The proposal seems to be that the application needs to
>>>>>>>>>>>>>>>>> tune their session timeout based on the 99tile of the time 
>>>>>>>>>>>>>>>>> they take to
>>>>>>>>>>>>>>>>> process events after every poll.   This turns out is a 
>>>>>>>>>>>>>>>>> nontrivial thing to
>>>>>>>>>>>>>>>>> do for an application todo. To start with when an application 
>>>>>>>>>>>>>>>>> is new their
>>>>>>>>>>>>>>>>> data is going to be based on tests that they have done on 
>>>>>>>>>>>>>>>>> synthetic data.
>>>>>>>>>>>>>>>>> This often times doesn't represent what they will see in 
>>>>>>>>>>>>>>>>> production.  Once
>>>>>>>>>>>>>>>>> the app is in production their processing latencies will 
>>>>>>>>>>>>>>>>> potentially vary
>>>>>>>>>>>>>>>>> over time.  It is extremely unlikely that the application 
>>>>>>>>>>>>>>>>> owner does a
>>>>>>>>>>>>>>>>> careful job of monitoring the 99tile of latencies over time 
>>>>>>>>>>>>>>>>> and readjust
>>>>>>>>>>>>>>>>> the settings.  Often times the latencies vary because of 
>>>>>>>>>>>>>>>>> variance is other
>>>>>>>>>>>>>>>>> services that are called by the consumer as part of 
>>>>>>>>>>>>>>>>> processing the events.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Case in point would be a simple app which reads events and
>>>>>>>>>>>>>>>>> writes to Kafka.  With quotas the write latencies to kafka 
>>>>>>>>>>>>>>>>> could range from
>>>>>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds.  As the 
>>>>>>>>>>>>>>>>> scale of
>>>>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' could 
>>>>>>>>>>>>>>>>> now get
>>>>>>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the 
>>>>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a 
>>>>>>>>>>>>>>>>> potential outage
>>>>>>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us to
>>>>>>>>>>>>>>>>> take care of this in the linkedin wrapper.  Whereby we would 
>>>>>>>>>>>>>>>>> keep calling
>>>>>>>>>>>>>>>>> poll on a separate thread periodically and enqueue the 
>>>>>>>>>>>>>>>>> messages. When the
>>>>>>>>>>>>>>>>> queue is full we would call pause().
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I
>>>>>>>>>>>>>>>>> think it is vastly better if we address this in the Api as 
>>>>>>>>>>>>>>>>> every major
>>>>>>>>>>>>>>>>> customer will eventually be pained by it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges
>>>>>>>>>>>>>>>>> and now is definitely the time to clean them up.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or
>>>>>>>>>>>>>>>>> undergo a big api redesign at this point beyond the group 
>>>>>>>>>>>>>>>>> management stuff
>>>>>>>>>>>>>>>>> we've discussed in the context of Samza/copycat which is 
>>>>>>>>>>>>>>>>> already a big
>>>>>>>>>>>>>>>>> effort.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Overall I agree that we have done a poor job of
>>>>>>>>>>>>>>>>> documenting which apis block and which don't and when people 
>>>>>>>>>>>>>>>>> are surprised
>>>>>>>>>>>>>>>>> because we haven't labeled something that will be 
>>>>>>>>>>>>>>>>> unintuitive. But the
>>>>>>>>>>>>>>>>> overall style of poll/select-based apis is quite common in 
>>>>>>>>>>>>>>>>> programming
>>>>>>>>>>>>>>>>> going back to unix select so I don't think it's beyond people 
>>>>>>>>>>>>>>>>> if explained
>>>>>>>>>>>>>>>>> well (after all we need to mix sync and async apis and if we 
>>>>>>>>>>>>>>>>> don't say
>>>>>>>>>>>>>>>>> which is which any scheme will be confusing).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For what it's worth the experience with this api has
>>>>>>>>>>>>>>>>> actually been about 1000x better than the issues people had 
>>>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>> intuitiveness with the high-level api. The crazy blocking 
>>>>>>>>>>>>>>>>> iterator,
>>>>>>>>>>>>>>>>> impossible internal queue sizing, baroque threading model, 
>>>>>>>>>>>>>>>>> etc  have all
>>>>>>>>>>>>>>>>> caused endless amounts of anger. Not to mention that that 
>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>> effectively disqualifies about 50% of the use cases people 
>>>>>>>>>>>>>>>>> want to try to
>>>>>>>>>>>>>>>>> use it for (plus I regularly hear people tell me they've 
>>>>>>>>>>>>>>>>> heard not to use
>>>>>>>>>>>>>>>>> it at all for various reasons ranging from data loss to lack 
>>>>>>>>>>>>>>>>> of features).
>>>>>>>>>>>>>>>>> It's important to have that context when people need to 
>>>>>>>>>>>>>>>>> switch and they say
>>>>>>>>>>>>>>>>> "oh the old way was so simple and the new way complex!" :-)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let me give some context related to your points, based on
>>>>>>>>>>>>>>>>> our previous discussions:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The motivation for avoiding additional threading was
>>>>>>>>>>>>>>>>> two-fold. First this client is really intended to be the 
>>>>>>>>>>>>>>>>> lowest level
>>>>>>>>>>>>>>>>> client. There are many, many possible higher level processing 
>>>>>>>>>>>>>>>>> abstractions.
>>>>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level 
>>>>>>>>>>>>>>>>> client was that
>>>>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with 
>>>>>>>>>>>>>>>>> things that are
>>>>>>>>>>>>>>>>> different in each use case like the appropriate threading 
>>>>>>>>>>>>>>>>> model. If you do
>>>>>>>>>>>>>>>>> this you need to also maintain a thread free low-level 
>>>>>>>>>>>>>>>>> consumer api for
>>>>>>>>>>>>>>>>> people to get around whatever you have done.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The second reason was that the internal threading in the
>>>>>>>>>>>>>>>>> client became quite complex. The answer with threading is 
>>>>>>>>>>>>>>>>> always that "it
>>>>>>>>>>>>>>>>> won't be complex this time", but it always is.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to
>>>>>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the 
>>>>>>>>>>>>>>>>> application must
>>>>>>>>>>>>>>>>> regularly consume to be considered an active consumer. This 
>>>>>>>>>>>>>>>>> allows the
>>>>>>>>>>>>>>>>> possibility of false positive failure detections. However 
>>>>>>>>>>>>>>>>> it's important to
>>>>>>>>>>>>>>>>> understand the downside of the alternative. If you do 
>>>>>>>>>>>>>>>>> background polling a
>>>>>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. 
>>>>>>>>>>>>>>>>> This leads to
>>>>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because 
>>>>>>>>>>>>>>>>> they have
>>>>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions 
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. 
>>>>>>>>>>>>>>>>> If you allow
>>>>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and 
>>>>>>>>>>>>>>>>> knows they aren't
>>>>>>>>>>>>>>>>> consuming frequently enough to be considered active but if 
>>>>>>>>>>>>>>>>> you allows false
>>>>>>>>>>>>>>>>> negatives you end up having weeks go by before someone 
>>>>>>>>>>>>>>>>> notices that a
>>>>>>>>>>>>>>>>> partition has been unconsumed the whole time at which point 
>>>>>>>>>>>>>>>>> the data is
>>>>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have 
>>>>>>>>>>>>>>>>> regular false
>>>>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this 
>>>>>>>>>>>>>>>>> in some depth
>>>>>>>>>>>>>>>>> at the time and decided that it is better to have the 
>>>>>>>>>>>>>>>>> liveness notion tied
>>>>>>>>>>>>>>>>> to *actual* consumption which is the actual definition of
>>>>>>>>>>>>>>>>> liveness.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus,
>>>>>>>>>>>>>>>>>> Samza, and some other teams and we got similar feedback.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll drives everything. The other methods 
>>>>>>>>>>>>>>>>>> like subscribe,
>>>>>>>>>>>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything 
>>>>>>>>>>>>>>>>>> without a
>>>>>>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    The semantics of a commit() call should be consistent
>>>>>>>>>>>>>>>>>>    between sync and async operations. Currently, sync commit 
>>>>>>>>>>>>>>>>>> is a blocking
>>>>>>>>>>>>>>>>>>    call which actually sends out an OffsetCommitRequest and 
>>>>>>>>>>>>>>>>>> waits for the
>>>>>>>>>>>>>>>>>>    response upon the user’s KafkaConsumer.commit call. 
>>>>>>>>>>>>>>>>>> However, the async
>>>>>>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the 
>>>>>>>>>>>>>>>>>> OffsetCommitRequest.
>>>>>>>>>>>>>>>>>>    The request itself is later sent out in the next poll. 
>>>>>>>>>>>>>>>>>> The teams we talked
>>>>>>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    Heartbeats are dependent on user application behavior
>>>>>>>>>>>>>>>>>>    (i.e. user applications calling poll). This can be a big 
>>>>>>>>>>>>>>>>>> problem as we
>>>>>>>>>>>>>>>>>>    don’t control how different applications behave. For 
>>>>>>>>>>>>>>>>>> example, we might have
>>>>>>>>>>>>>>>>>>    an application which reads from Kafka and writes to 
>>>>>>>>>>>>>>>>>> Espresso. If Espresso
>>>>>>>>>>>>>>>>>>    is slow for whatever reason, then in rebalances could 
>>>>>>>>>>>>>>>>>> happen.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Generally speaking, we feel that the current
>>>>>>>>>>>>>>>>>> KafkaConsumer API design is more of a wrapping around the 
>>>>>>>>>>>>>>>>>> old simple
>>>>>>>>>>>>>>>>>> consumer, i.e. in old consumer we ask users to deal with raw 
>>>>>>>>>>>>>>>>>> protocols and
>>>>>>>>>>>>>>>>>> error handlings while in KafkaConsumer we do that for users. 
>>>>>>>>>>>>>>>>>> However, for
>>>>>>>>>>>>>>>>>> old high level consumer users (which are the majority of 
>>>>>>>>>>>>>>>>>> users), the
>>>>>>>>>>>>>>>>>> experience is a noticeable regression. The old high level 
>>>>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>>>>> interface is simple and easy to use for end user, while 
>>>>>>>>>>>>>>>>>> KafkaConsumer
>>>>>>>>>>>>>>>>>> requires users to be aware of many underlying details and is 
>>>>>>>>>>>>>>>>>> becoming
>>>>>>>>>>>>>>>>>> prohibitive for users to adopt. This is hinted by the 
>>>>>>>>>>>>>>>>>> javadoc growing
>>>>>>>>>>>>>>>>>> bigger and bigger.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We think it's getting to the point where we should take a
>>>>>>>>>>>>>>>>>> step back and look at the big picture.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called 
>>>>>>>>>>>>>>>>>> by the user
>>>>>>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the end
>>>>>>>>>>>>>>>>>> user, this ends up making us educate users on NIO and the 
>>>>>>>>>>>>>>>>>> consequences of
>>>>>>>>>>>>>>>>>> not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> More generally speaking, there are many surprises with
>>>>>>>>>>>>>>>>>> the current KafkaConsumer implementation.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume
>>>>>>>>>>>>>>>>>> messages
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or
>>>>>>>>>>>>>>>>>> partitions(explicit group management)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The last point is the big one that we think we aren't
>>>>>>>>>>>>>>>>>> hitting. We think the most important example is that there 
>>>>>>>>>>>>>>>>>> should be no
>>>>>>>>>>>>>>>>>> requirement from the end user to consistently 
>>>>>>>>>>>>>>>>>> KafkaConsumer.poll in order
>>>>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be 
>>>>>>>>>>>>>>>>>> better to split
>>>>>>>>>>>>>>>>>> those tasks into tasks that should not rely on 
>>>>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks
>>>>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the end
>>>>>>>>>>>>>>>>>> user.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to
>>>>>>>>>>>>>>>>>> you guys early next week. We’d like to meet up with you at 
>>>>>>>>>>>>>>>>>> LinkedIn on *July
>>>>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it to
>>>>>>>>>>>>>>>>>> open source.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Thanks,
>>>>>>>>>> Neha
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks,
>>>>>>>> Ewen
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Neha
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Neha
>>>>
>>>
>>>
>>
>

Reply via email to