> same time I would prefer additional churn to the API. I meant to say prefer _less_ churn to the API.
On Tue, Jun 09, 2015 at 08:35:35AM -0700, Joel Koshy wrote: > Ewen, > > Sorry for the late comment, but while we are discussing this API I > think we should also consider the addition of metadata to the offset > commit. This is supported on the broker-side but there is no means in > the current API to include metadata or retention time in the offset > commit. Ideally, this would also mean changing the "committed" API to > return the offset + metadata. I realize this is orthogonal to what you > are trying to achieve, so I'm okay with breaking this out - at the > same time I would prefer additional churn to the API. > > Thanks, > > Joel > > On Mon, Apr 27, 2015 at 06:03:05PM -0700, Guozhang Wang wrote: > > Thanks Ewen, > > > > 1. I agree with you as for the pros of Future; my argument for pure > > Callback, as I mentioned, is that it sounds to me unlikely users would > > usually want to explicitly set timeout. That said, if people think this > > scenario is actually common like in graceful shutdown, then I am OK with > > Future against just the callback. My concern against Future is that after > > thinking through the code myself I feel the implementation would be quite > > complicated. > > > > 2. As for infinite retries, for now we have three "coordinator-blocking" > > call, one for join-group, one for sync-commit-offsets, one for > > fetch-offsets; I think for these three it is appropriate to use blocking > > calls, since the consumer cannot proceed anyways without getting the > > response from these calls according to their semantics. > > > > 3. With the priority mechanism I think we can then get rid of the muting > > mechanism. Not sure if it is possible to implement in > > java.nio.channels.Selector? > > > > Guozhang > > > > > > > > On Thu, Apr 23, 2015 at 2:25 PM, Ewen Cheslack-Postava <e...@confluent.io> > > wrote: > > > > > Thanks, great feedback everyone. > > > > > > Jiangjie -- I was worried about interleaving as well. For commits using > > > the > > > consumer's own current set of offsets, I agree we could easily limit to 1 > > > outstanding request so the older one gets cancelled. For commits that > > > specify offsets manually, we might not need to do anything special, just > > > note in the docs that bad things can happen if you submit two separate > > > offset commit requests (i.e. don't wait for callbacks) and have retries > > > enabled. Alternatively, we could just serialize them, i.e. always have at > > > most 1 outstanding and retries always take priority. > > > > > > Guozhang -- That breakdown of use cases looks right to me. I agree that I > > > don't think users of this API would be trying to use the futures for > > > promise pipelining or anything, the only use is to provide allow them to > > > block on the operation. That said, I think there are some tradeoffs to > > > consider between the two options: > > > > > > Pros of Future/Cons of only having callback: > > > * Gives control over timeouts. With only callbacks users have to manage > > > this themselves if they care about timing. I think there's at least one > > > very common case for this: graceful shutdown where I want to try to commit > > > offsets, but after some time shutdown whether successful or not. > > > * Consistency. This matches the Producer interface and Futures are a > > > standard pattern. > > > > > > Pros of only callback/cons of Future > > > * Know up front if its sync/async. This might simplify the implementation, > > > as Guozhang points out. (However, it also means the patch needs to add a > > > timeout mechanism, which doesn't exist yet. That's probably not a huge > > > patch, but non-trivial. Maybe it needs to be added regardless.) > > > > > > Regardless of the interface we settle on, I'd argue that we should get rid > > > of the infinite retry version, at least limiting it to a max # of retries, > > > each of which are bound by a timeout. It's really problematic having it > > > run > > > indefinitely long since it locks up the consumer, which means you can't, > > > e.g., shut down properly. More generally, I think anytime we we have an > > > API > > > where a TimeoutException is *not* a possibility, we're almost definitely > > > trying to hide the network from the user in a way that makes it difficult > > > for them to write applications that behave correctly. > > > > > > On the muting implementation, I'm not sure I'm convinced it's *required* > > > to > > > mute the others entirely. Couldn't Selector.poll have a mechanism for > > > prioritizing reads rather than completely muting the other nodes? For > > > example, what if poll() did one select for just the key we're currently > > > keeping unmuted with the timeout, then if there are no keys ready to read, > > > reregister interest and select(0) to only get the ones that are > > > immediately > > > ready. > > > > > > Jay -- the polling thing isn't an issue if you just poll from within the > > > Future. That's how the WIP patch works. My only concern with that is that > > > it's unintuitive because most future implementations are just waiting for > > > another thread to finish some operation; the only potentially bad side > > > affect I could think of is that user callbacks might then run in the > > > thread > > > calling Future.get(), which might be unexpected if they think they are > > > doing all the work of polling in a different thread. > > > > > > -Ewen > > > > > > > > > On Wed, Apr 22, 2015 at 12:29 PM, Bhavesh Mistry < > > > mistry.p.bhav...@gmail.com > > > > wrote: > > > > > > > Hi Ewen, > > > > > > > > Only time I can think of where Application needs to know result of > > > > offset > > > > was committed or not during graceful shutdown and/or > > > > Runtime.addShutdownHook() so consumer application does not get > > > > duplicated > > > > records upon restart or does not have to deal with eliminating already > > > > process offset. Only thing that consumer application will have to > > > > handle > > > > is after XX retry failure to commit offset. Or would prefer application > > > to > > > > manage this last offset commit when offset can not be commit due to > > > > failure, connection timeout or any other failure case ? > > > > > > > > Thanks, > > > > Bhavesh > > > > > > > > > > > > > > > > On Wed, Apr 22, 2015 at 11:20 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > > > > > I second Guozhang's proposal. I do think we need the callback. The > > > > current > > > > > state is that for async commits you actually don't know if it > > > succeeded. > > > > > However there is a totally valid case where you do need to know if it > > > > > succeeded but don't need to block, and without the callback you are > > > > stuck. > > > > > I think the futures will likely cause problems since blocking on the > > > > future > > > > > precludes polling which would allow it to complete. > > > > > > > > > > -Jay > > > > > > > > > > On Wed, Apr 22, 2015 at 11:12 AM, Guozhang Wang <wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Ewen, > > > > > > > > > > > > I share the same concern you have about 2), that with the new API > > > sync > > > > > > commit implementation is a bit awkward since we have a > > > single-threaded > > > > > > design in new consumer. The reason that we need to mute other nodes > > > for > > > > > > doing coordinator sync operations like join-group / offset commits / > > > > > offset > > > > > > fetches is to avoid long blocking due to possible "starvation" on > > > > network > > > > > > selector, so I think they need to be done still. > > > > > > > > > > > > On the other hand, I think users using the commit API will usually > > > fall > > > > > > into three categories: > > > > > > > > > > > > 1) I really care that the offsets to be committed before moving on > > > > > > to > > > > > fetch > > > > > > more data, so I will wait FOREVER for it to complete. > > > > > > > > > > > > 2) I do not really care about whether it succeeds or not, so just > > > fire > > > > > > "commit" and let's move on; if it fails it fails (and it will be > > > > logged). > > > > > > > > > > > > 3) I care if it succeeds or not, but I do not want to wait > > > > indefinitely; > > > > > so > > > > > > let me know if it does not finish within some timeout or failed > > > > > > (i.e. > > > > > give > > > > > > me the exceptions / error codes) and I will handle it. > > > > > > > > > > > > The current APIs does not handle case 3) above, which sits between > > > > BLOCK > > > > > > FOREVER and DO NOT CARE AT ALL, but most times people would not be > > > very > > > > > > explicit about the exact "timeout", but just knowing it is definite > > > and > > > > > > reasonably short is good enough. I think for this we probably do not > > > > need > > > > > > an extra timeout / retry settings, but rely on the general request > > > > retry > > > > > > settings; similarly we probably do not need "cancel". > > > > > > > > > > > > So I wonder if we can do a slightly different modification to API > > > like > > > > > > this: > > > > > > > > > > > > void commit(Map<TopicPartition, Long> offsets, CommitType type, > > > > > > ConsumerCommitCallback callback); > > > > > > > > > > > > For case 1) people call "commit(offsets)" which will block forever > > > > until > > > > > it > > > > > > succeeds; > > > > > > > > > > > > For case 2) people call "commit(offsets, async)" which will return > > > > > > immediately, with not callback upon finishes; > > > > > > > > > > > > For case 3) people call "commit(offsets, async, callback)", and the > > > > > > callback will be executed when it finishes or #.request retries has > > > > > > exhausted. > > > > > > > > > > > > This API will make much smaller changes to the current > > > implementations > > > > as > > > > > > well. Of course if we have a common scenario where users would > > > > > > really > > > > > care > > > > > > about the exact timeout for async commits, then Future may be a good > > > > > > approach. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Thu, Apr 16, 2015 at 1:00 PM, Jiangjie Qin > > > > <j...@linkedin.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hey Ewen, > > > > > > > > > > > > > > This makes sense. People usually do not want to stop consuming > > > > > > > when > > > > > > > committing offsets. > > > > > > > > > > > > > > One corner case about async commit with retries I am thinking is > > > that > > > > > it > > > > > > > is possible that two offset commits interleave with each other and > > > > that > > > > > > > might create problem. Like you said maybe we can cancel the > > > previous > > > > > one. > > > > > > > > > > > > > > Another thing is that whether the future mechanism will only be > > > > applied > > > > > > to > > > > > > > auto commit or it will also be used in manual commit? Because in > > > new > > > > > > > consumer we allow user to provide an offset map for offset commit. > > > > > Simply > > > > > > > canceling a previous pending offset commit does not seem to be > > > ideal > > > > in > > > > > > > this case because the two commits could be for different > > > partitions. > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > On 4/14/15, 4:31 PM, "Ewen Cheslack-Postava" <e...@confluent.io> > > > > > wrote: > > > > > > > > > > > > > > >I'd like to get some feedback on changing the offset commit API > > > > > > > >in > > > > the > > > > > > new > > > > > > > >consumer. Since this is user-facing API I wanted to make sure > > > > > > > >this > > > > > gets > > > > > > > >better visibility than the JIRA ( > > > > > > > >https://issues.apache.org/jira/browse/KAFKA-2123) might. > > > > > > > > > > > > > > > >The motivation is to make it possible to do async commits but be > > > > able > > > > > to > > > > > > > >tell when the commit completes/fails. I'm suggesting changing the > > > > API > > > > > > from > > > > > > > > > > > > > > > >void commit(Map offsets, CommitType) > > > > > > > > > > > > > > > >to > > > > > > > > > > > > > > > >Future<Void> commit(Map<TopicPartition, Long> offsets, > > > > > > > >ConsumerCommitCallback callback); > > > > > > > > > > > > > > > >which matches the approach used for the producer. The > > > > > > > >ConsumerCommitCallback only has one method: > > > > > > > > > > > > > > > >public void onCompletion(Exception exception); > > > > > > > > > > > > > > > >This enables a few different use cases: > > > > > > > > > > > > > > > >* Blocking commit via Future.get(), and blocking with timeouts > > > > > > > >via > > > > > > > >Future.get(long, TimeUnit) > > > > > > > >* See exceptions via the future (see discussion of retries below) > > > > > > > >* Callback-based notification so you can keep processing messages > > > > and > > > > > > only > > > > > > > >take action if something goes wrong, takes too long, etc. This is > > > > the > > > > > > use > > > > > > > >case that motivated > > > > > > > >* Fire and forget commits via a shorthand commit() API and > > > ignoring > > > > > the > > > > > > > >resulting future. > > > > > > > > > > > > > > > >One big difference between this and the producer API is that > > > > > > > >there > > > > > isn't > > > > > > > >any result (except maybe an exception) from commitOffsets. This > > > > leads > > > > > to > > > > > > > >the somewhat awkward Future<Void> signature. I personally prefer > > > > that > > > > > to > > > > > > > >the sync/async flag, especially since it also provides a > > > > non-blocking > > > > > > > >interface for checking whether the commit is complete. > > > > > > > > > > > > > > > >I posted a WIP patch to the JIRA. In the progress of making it I > > > > > found a > > > > > > > >few issues that might be worth discussing: > > > > > > > > > > > > > > > >1. Retries. In the old approach, this was trivial since it only > > > > > applied > > > > > > to > > > > > > > >synchronous calls, so we could just loop until the request was > > > > > > successful. > > > > > > > >Do we want to start introducing a retries mechanism here, and > > > should > > > > > it > > > > > > > >apply to all types of requests or are we going to end up with a > > > > couple > > > > > > of > > > > > > > >different retry settings for specific cases, like offset commit. > > > The > > > > > WIP > > > > > > > >patch allows errors to bubble up through the Future on the first > > > > > > failure, > > > > > > > >which right now can cause some tests to fail transiently (e.g. > > > > > consumer > > > > > > > >bounce test). > > > > > > > > > > > > > > > >I think some sort of retry mechanism, even if it's an internal > > > > > constant > > > > > > > >rather than configurable, is probably the right solution, but I > > > want > > > > > to > > > > > > > >figure out how broadly they should apply. I think adding them > > > > > > > >only > > > > for > > > > > > > >offset commits isn't hard. > > > > > > > > > > > > > > > >2. The Future implementation is a bit weird because the consumer > > > > > doesn't > > > > > > > >have a dedicated IO thread. My only concern is that this could > > > lead > > > > to > > > > > > > >some > > > > > > > >unintuitive results based on the current implementation because > > > the > > > > > way > > > > > > > >this works is to just run poll() in the thread calling > > > Future.get(), > > > > > but > > > > > > > >it > > > > > > > >mutes all non-coordinator nodes which means other processing is > > > > > > > >effectively > > > > > > > >paused. If you're processing offset commits in a separate thread > > > > from > > > > > > your > > > > > > > >main consumer thread that's calling poll(), you might just end up > > > > > > bocking > > > > > > > >the main thread while waiting on the Future. Then again, I'm not > > > > sure > > > > > > the > > > > > > > >other nodes really even need to be muted -- maybe Jay or Guozhang > > > > have > > > > > > > >ideas on this? > > > > > > > > > > > > > > > >3. Should the future be cancellable? This probably isn't hard to > > > > > > > >implement, > > > > > > > >but I'm not sure we should even bother. On the one hand it could > > > be > > > > > > nice, > > > > > > > >especially if you have an old commit request that you want to > > > > > superseded > > > > > > > >by > > > > > > > >a new one with updated offsets. On the other hand, if the request > > > > has > > > > > > > >already been sent out, cancelling it won't accomplish anything. I > > > > > think > > > > > > > >the > > > > > > > >only case this is useful is when there are retries. > > > > > > > > > > > > > > > >Thoughts? > > > > > > > > > > > > > > > >-- > > > > > > > >Thanks, > > > > > > > >Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Ewen > > > > > > > > > > > -- > > -- Guozhang > > -- > Joel