Kartik, on your second point about timeouts with poll() and heartbeats, the
consumer now handles this properly. KAFKA-2123 introduced a
DelayedTaskQueue and that is used internally to handle processing events at
the right time even if poll() is called with a large timeout. The same
mechanism is used to handle auto commit, which should also occur in a
timely fashion even if poll() is called with a large timeout.

On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io> wrote:

> Hey Kartik,
> Totally agree we don't want people tuning timeouts in the common case.
> However there are two ways to avoid this:
> 1. Default the timeout high
> 2. Put the heartbeat in a separate thread
> When we were doing the consumer design we discussed this tradeoff and I
> think the conclusion we came to was that defaulting to a high timeout was
> actually better. This means it takes a little longer to detect a failure,
> but usually that is not a big problem and people who want faster failure
> detection can tune it down. This seemed better than having the failure
> detection not really cover the consumption and just be a background ping.
> The two reasons where (a) you still have the GC problem even for the
> background thread, (b) consumption is in some sense a better definition of
> an active healthy consumer and a lot of problems crop up when you have an
> inactive consumer with an active background thread (as today).
> When we had the discussion I think what we realized was that most people
> who were worried about the timeout where imagining a very low default
> (500ms) say. But in fact just setting this to 60 seconds or higher as a
> default would be okay, this adds to the failure detection time but only
> apps that care about this need to tune. This should largely eliminate false
> positives since after all if you disappear for 60 seconds that actually
> starts to be more of a true positive, even if you come back... :-)
> -Jay
> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
> kparamasi...@linkedin.com> wrote:
>> adding the open source alias.  This email started off as a broader
>> discussion around the new consumer.  I was zooming into only the aspect of
>> poll() being the only mechanism for driving the heartbeats.
>> Yes the lag is the effect of the problem (not the problem).  Monitoring
>> the lag is important as it is the primary way to tell if the application is
>> wedged.  There might be other metrics which can possibly capture the same
>> essence. Yes the lag is at the consumer group level, but you can tell that
>> one of the consumers is messed up if one of the partitions in the
>> application start generating lag and others are good for e.g.
>> Monitoring aside, I think the main point of concern is that in the old
>> consumer most customers don't have to worry about unnecessary rebalances
>> and most of the things that they do in their app doesn't have an impact on
>> the session timeout..  (i.e. the only thing that causes rebalances is when
>> the GC is out of whack).    For the handful of customers who are impacted
>> by GC related rebalances, i would imagine that all of them would really
>> want us to make the system more resilient.    I agree that the GC problem
>> can't be solved easily in the java client, however it appears that now we
>> would be expecting the consuming applications to be even more careful with
>> ongoing tuning of the timeouts.  At LinkedIn, we have seen that most kafka
>> applications don't have much of a clue about configuring the timeouts and
>> just end up calling the Kafka team when their application sees rebalances.
>> The other side effect of poll driving the heartbeats is that we have to
>> make sure that people don't set a poll timeout that is larger than the
>> session timeout.   If we had a notion of implicit heartbeats then we could
>> also automatically make this work for consumers by sending hearbeats at the
>> appropriate interval even though the customers want to do a long poll.
>> We could surely work around this in LinkedIn if either we have the
>> Pause() api or an explicit HeartBeat() api on the consumer.
>> Would love to hear how other people think about this subject ?
Kartik
>> Kartik
>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io> wrote:
>>> Agree with the dilemma you are pointing out, which is that there are
>>> many ways the application's message processing could fail and we wouldn't
>>> be able to model all of those in the consumer's failure detection
>>> mechanism. So we should try to model as much of it as we can so the
>>> consumer's failure detection is meaningful.
>>>> Point being that the only absolute way to really detect that an app is
>>>> healthy is to monitor lag. If the lag increases then for sure something is
>>>> wrong.
>>> The lag is merely the effect of the problem, not the problem itself. Lag
>>> is also a consumer group level concept and the problem we have is being
>>> able to detect failures at the level of individual consumer instances.
>>> As you pointed out, a consumer that poll() is a stronger indicator of
>>> whether the consumer is alive or not. The dilemma then is who defines what
>>> a healthy poll() frequency is. No one else but the application owner can
>>> define what a "normal" processing latency is for their application. Now the
>>> question is what's the easiest way for the user to define this without
>>> having to tune and fine tune this too often. The heartbeat interval
>>> certainly does not have to be *exactly* 99tile of processing latency
>>> but could be in the ballpark + an error delta. The error delta is the
>>> application owner's acceptable risk threshold during which they would be ok
>>> if the application remains part of the group despite being dead. It is
>>> ultimately a tradeoff between operational ease and more accurate failure
>>> detection.
>>> With quotas the write latencies to kafka could range from a few
>>>> milliseconds all the way to a tens of seconds.
>>> This is actually no different from the GC problem. Most most of the
>>> times, the normal GC falls in the few ms range and there are many
>>> applications even at LinkedIn for which the max GC falls in the multiple
>>> seconds range. Note that it also can't be predicted, so has to be an
>>> observed value. One way or the other, you have to observe what this
>>> acceptable "max" is for your application and then set the appropriate
>>> timeouts.
>>> Since this is not something that can be automated, this is a config that
>>> the application owner has to set based on the expected behavior of their
>>> application. Not wanting to do that leads to ending up with bad consumption
>>> semantics where the application process continues to be part of a group
>>> owning partitions but not consuming since it has halted due to a problem.
>>> The fact that the design requires them to express that in poll() frequency
>>> or not doesn't change the fact that the application owner has to go through
>>> the process of measuring and then defining this "max".
>>> The reverse where they don't do this and the application remains in the
>>> group despite being dead is super confusing and frustrating too. So the due
>>> diligence up front is actually worth. And as long as the poll() latency and
>>> processing latency can be monitored, it should be easy to tell the reason
>>> for a rebalance, whether that is valid or not and how that should be tuned.
>>> As for the wrapper, KIP-28 is the wrapper in open source that will hide
>>> this complexity and I agree that LI is unblocked since you can do this in
>>> TrackerConsumer in the meantime.
Neha
>>> Neha
>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>> kparamasi...@linkedin.com> wrote:
>>>> For commit(), I think it should hopefully be an easier discussion, so
>>>> maybe we can follow up when we meet up next.
>>>> As far as the heartbeat is concerned, I think the points you discuss
>>>> are all very valid.
>>>> GC pauses impacting the heartbeats is a real issue. However there are a
>>>> smaller percentage of memory hungry apps that get hit by it.
>>>> The broader issue whereby even if the heartbeats are healthy, the app
>>>> might not be behaving correctly is also real.  If the app is calling poll()
>>>> then the probability that the app is healthy is surely higher.  But this
>>>> again isn't an absolute measure that the app is processing correctly.
>>>> In other cases the app might have even died in which case this
>>>> discussion is moot.
>>>> Point being that the only absolute way to really detect that an app is
>>>> healthy is to monitor lag. If the lag increases then for sure something is
>>>> wrong.
>>>> The proposal seems to be that the application needs to tune their
>>>> session timeout based on the 99tile of the time they take to process events
>>>> after every poll.   This turns out is a nontrivial thing to do for an
>>>> application todo. To start with when an application is new their data is
>>>> going to be based on tests that they have done on synthetic data.  This
>>>> often times doesn't represent what they will see in production.  Once the
>>>> app is in production their processing latencies will potentially vary over
>>>> time.  It is extremely unlikely that the application owner does a careful
>>>> job of monitoring the 99tile of latencies over time and readjust the
>>>> settings.  Often times the latencies vary because of variance is other
>>>> services that are called by the consumer as part of processing the events.
>>>> Case in point would be a simple app which reads events and writes to
>>>> Kafka.  With quotas the write latencies to kafka could range from a few
>>>> milliseconds all the way to a tens of seconds.  As the scale of processing
>>>> for an app increasing the app or that 'user' could now get quotaed.
>>>> Instead of slowing down gracefully unless the application owner has
>>>> carefully tuned the timeout, now we are looking at a potential outage where
>>>> the app could get hit by constant rebalances.
>>>> If we expose the pause() Api then It is possible for us to take care of
>>>> this in the linkedin wrapper.  Whereby we would keep calling poll on a
>>>> separate thread periodically and enqueue the messages. When the queue is
>>>> full we would call pause().
>>>> In essence we can work around it in LinkedIn, however I think it is
>>>> vastly better if we address this in the Api as every major customer will
>>>> eventually be pained by it.
Kartik
>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> wrote:
>>>> Hey guys,
>>>> Happy to discuss. I agree there may be some rough edges and now is
>>>> definitely the time to clean them up.
>>>> I'm pretty reluctant to change the threading model or undergo a big api
>>>> redesign at this point beyond the group management stuff we've discussed in
>>>> the context of Samza/copycat which is already a big effort.
>>>> Overall I agree that we have done a poor job of documenting which apis
>>>> block and which don't and when people are surprised because we haven't
>>>> labeled something that will be unintuitive. But the overall style of
>>>> poll/select-based apis is quite common in programming going back to unix
>>>> select so I don't think it's beyond people if explained well (after all we
>>>> need to mix sync and async apis and if we don't say which is which any
>>>> scheme will be confusing).
>>>> For what it's worth the experience with this api has actually been
>>>> about 1000x better than the issues people had around intuitiveness with the
>>>> high-level api. The crazy blocking iterator, impossible internal queue
>>>> sizing, baroque threading model, etc  have all caused endless amounts of
>>>> anger. Not to mention that that client effectively disqualifies about 50%
>>>> of the use cases people want to try to use it for (plus I regularly hear
>>>> people tell me they've heard not to use it at all for various reasons
>>>> ranging from data loss to lack of features). It's important to have that
>>>> context when people need to switch and they say "oh the old way was so
>>>> simple and the new way complex!" :-)
>>>> Let me give some context related to your points, based on our previous
>>>> discussions:
>>>> For commit, let's discuss, that is easy either way.
>>>> The motivation for avoiding additional threading was two-fold. First
>>>> this client is really intended to be the lowest level client. There are
>>>> many, many possible higher level processing abstractions. One thing we
>>>> found to be a big problem with the high-level client was that it coupled
>>>> things everyone must have--failover, etc--with things that are different in
>>>> each use case like the appropriate threading model. If you do this you need
>>>> to also maintain a thread free low-level consumer api for people to get
>>>> around whatever you have done.
>>>> The second reason was that the internal threading in the client became
>>>> quite complex. The answer with threading is always that "it won't be
>>>> complex this time", but it always is.
>>>> For the heartbeat you correctly describe the downside to coupling
>>>> heartbeat with poll--the contract is that the application must regularly
>>>> consume to be considered an active consumer. This allows the possibility of
>>>> false positive failure detections. However it's important to understand the
>>>> downside of the alternative. If you do background polling a consumer is
>>>> considered active as long as it isn't shutdown. This leads to all kinds of
>>>> active consumers that aren't consuming because they have leaked or
>>>> otherwise stopped but are still claiming partitions and heart-beating. This
>>>> failure mode is actually far far worse. If you allow false positives the
>>>> user sees the frequent rebalances and knows they aren't consuming
>>>> frequently enough to be considered active but if you allows false negatives
>>>> you end up having weeks go by before someone notices that a partition has
>>>> been unconsumed the whole time at which point the data is gone. Plus of
>>>> course even if you do this you still have regular false positives anyway
>>>> from GC pauses (as now). We discussed this in some depth at the time and
>>>> decided that it is better to have the liveness notion tied to *actual*
>>>> consumption which is the actual definition of liveness.
>>>> Cheers,
>>>> -Jay
>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <okara...@linkedin.com>
>>>> wrote:
>>>>> Hi Confluent Team.
>>>>> There has recently been a lot of open source activity regarding the
>>>>> new KafkaConsumer:
>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza, and some
>>>>> other teams and we got similar feedback.
>>>>> To summarize the feedback we received from other teams:
>>>>>    1.
>>>>>    The current behavior is not intuitive. For example,
>>>>>    KafkaConsumer.poll drives everything. The other methods like subscribe,
>>>>>    unsubscribe, seek, commit(async) don’t do anything without a
>>>>>    KafkaConsumer.poll call.
>>>>>    1.
>>>>>    The semantics of a commit() call should be consistent between sync
>>>>>    and async operations. Currently, sync commit is a blocking call which
>>>>>    actually sends out an OffsetCommitRequest and waits for the response 
>>>>> upon
>>>>>    the user’s KafkaConsumer.commit call. However, the async commit is a
>>>>>    nonblocking call which just queues up the OffsetCommitRequest. The 
>>>>> request
>>>>>    itself is later sent out in the next poll. The teams we talked to found
>>>>>    this misleading.
>>>>>    1.
>>>>>    Heartbeats are dependent on user application behavior (i.e. user
>>>>>    applications calling poll). This can be a big problem as we don’t 
>>>>> control
>>>>>    how different applications behave. For example, we might have an
>>>>>    application which reads from Kafka and writes to Espresso. If Espresso 
>>>>> is
>>>>>    slow for whatever reason, then in rebalances could happen.
>>>>> Generally speaking, we feel that the current KafkaConsumer API design
>>>>> is more of a wrapping around the old simple consumer, i.e. in old consumer
>>>>> we ask users to deal with raw protocols and error handlings while in
>>>>> KafkaConsumer we do that for users. However, for old high level consumer
>>>>> users (which are the majority of users), the experience is a noticeable
>>>>> regression. The old high level consumer interface is simple and easy to 
>>>>> use
>>>>> for end user, while KafkaConsumer requires users to be aware of many
>>>>> underlying details and is becoming prohibitive for users to adopt. This is
>>>>> hinted by the javadoc growing bigger and bigger.
>>>>> We think it's getting to the point where we should take a step back
>>>>> and look at the big picture.
>>>>> The current state of KafkaConsumer is that it's single-threaded.
>>>>> There's one big KafkaConsumer.poll called by the user which pretty much
>>>>> drives everything:
>>>>> - data fetches
>>>>> - heartbeats
>>>>> - join groups (new consumer joining a group, topic subscription
>>>>> changes, reacting to group rebalance)
>>>>> - async offset commits
>>>>> - executing callbacks
>>>>> Given that the selector's poll is being driven by the end user, this
>>>>> ends up making us educate users on NIO and the consequences of not calling
>>>>> KafkaConsumer.poll frequently enough:
>>>>> - Coordinator will mark the consumer dead
>>>>> - async commits won't send
>>>>> - callbacks won't fire
>>>>> More generally speaking, there are many surprises with the current
>>>>> KafkaConsumer implementation.
>>>>> Here's what we consider to be the goals of KafkaConsumer:
>>>>> - NIO
>>>>> - ability to commit, manipulate offsets, and consume messages
>>>>> - a way to subscribe to topics(auto group management) or
>>>>> partitions(explicit group management)
>>>>> - no surprises in the user experience
>>>>> The last point is the big one that we think we aren't hitting. We
>>>>> think the most important example is that there should be no requirement
>>>>> from the end user to consistently KafkaConsumer.poll in order for all of
>>>>> the above tasks to happen. We think it would be better to split those 
>>>>> tasks
>>>>> into tasks that should not rely on KafkaConsumer.poll and tasks that 
>>>>> should
>>>>> rely on KafkaConsumer.poll.
>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>> - heartbeats
>>>>> - join groups
>>>>> - commits
>>>>> - executing callbacks
>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>> This would help reduce the amount of surprises to the end user.
>>>>> We’ve sketched out a proposal and we’ll send it out to you guys early
>>>>> next week. We’d like to meet up with you at LinkedIn on *July 31,
>>>>> 2015* so we can talk about it before proposing it to open source.
>>>>> Regards,
>>>>> LinkedIn Kafka Dev Team
Neha
>>> Thanks,
>>> Neha


