I'm thinking the flush call timeout will naturally be the timeout for a produce request, No?
Currently it seems we don¹t have a timeout for client requests, should we have one? ‹Jiangjie (Becket) Qin On 2/16/15, 8:19 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: >Yes, I think we all agree it would be good to add a client-side request >timeout. That would effectively imply a flush timeout as well since any >requests that couldn't complete in that time would be errors and hence >completed in the definition we gave. > >-Jay > >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry ><mistry.p.bhav...@gmail.com> >wrote: > >> Hi All, >> >> Thanks Jay and all address concern. I am fine with just having flush() >> method as long as it covers failure mode and resiliency. e.g We had >> situation where entire Kafka cluster brokers were reachable, but upon >> adding new kafka node and admin migrated "leader to new brokers" that >>new >> brokers is NOT reachable from producer stand point due to fire wall but >> metadata would continue to elect new broker as leader for that >>partition. >> >> All I am asking is either you will have to give-up sending to this >>broker >> or do something in this scenario. As for the current code 0.8.2 >>release, >> caller thread of flush() or close() method would be blocked for ever.... >> so all I am asking is >> >> https://issues.apache.org/jira/browse/KAFKA-1659 >> https://issues.apache.org/jira/browse/KAFKA-1660 >> >> Also, I recall that there is timeout also added to batch to indicate how >> long "message" can retain in memory before expiring. >> >> Given, all this should this API be consistent with others up coming >> patches for addressing similar problem(s). >> >> >> Otherwise, what we have done is spawn a thread for just calling close() >>or >> flush with timeout for join on caller end. >> >> Anyway, I just wanted to give you issues with existing API and if you >>guys >> think this is fine then, I am ok with this approach. It is just that >>caller >> will have to do bit more work. >> >> >> Thanks, >> >> Bhavesh >> >> On Thursday, February 12, 2015, Joel Koshy <jjkosh...@gmail.com> wrote: >> >> > Yes that is a counter-example. I'm okay either way on whether we >> > should have just flush() or have a timeout. Bhavesh, does Jay's >> > explanation a few replies prior address your concern? If so, shall we >> > consider this closed? >> > >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote: >> > > Yeah we could do that, I guess I just feel like it adds confusion >> because >> > > then you have to think about which timeout you want, when likely you >> > don't >> > > want a timeout at all. >> > > >> > > I guess the pattern I was thinking of was fflush or the java >> equivalent, >> > > which don't have timeouts: >> > > >> > >> >>http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush( >>) >> > > >> > > -Jay >> > > >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkosh...@gmail.com> >> > wrote: >> > > >> > > > I think tryFlush with a timeout sounds good to me. This is really >> more >> > > > for consistency than anything else. I cannot think of any standard >> > > > blocking calls off the top of my head that don't have a timed >> variant. >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they >> > > > provide an entirely non-blocking mode (e.g., socketChannel.connect >> > > > followed by finishConnect) >> > > > >> > > > Thanks, >> > > > >> > > > Joel >> > > > >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote: >> > > > > Jay, >> > > > > >> > > > > The .flush() call seems like it would be the best way if you >>wanted >> > > > to-do a >> > > > > clean shutdown of the new producer? >> > > > > >> > > > > So, you could in your code "stop all incoming requests && >> > > > producer.flush() >> > > > > && system.exit(value)" and know pretty much you won't drop >>anything >> > on >> > > > the >> > > > > floor. >> > > > > >> > > > > This can be done with the callbacks and futures (sure) but >>.flush() >> > seems >> > > > > to be the right time to block and a few lines of code, no? >> > > > > >> > > > > ~ Joestein >> > > > > >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps >><jay.kr...@gmail.com> >> > wrote: >> > > > > >> > > > > > Hey Bhavesh, >> > > > > > >> > > > > > If a broker is not available a new one should be elected to >>take >> > over, >> > > > so >> > > > > > although the flush might take longer it should still be quick. >> > Even if >> > > > not >> > > > > > this should result in an error not a hang. >> > > > > > >> > > > > > The cases you enumerated are all covered already--if the user >> > wants to >> > > > > > retry that is covered by the retry setting in the client, for >>all >> > the >> > > > > > errors that is considered completion of the request. The post >> > > > condition of >> > > > > > flush isn't that all sends complete successfully, just that >>they >> > > > complete. >> > > > > > So if you try to send a message that is too big, when flush >> returns >> > > > calling >> > > > > > .get() on the future should not block and should produce the >> error. >> > > > > > >> > > > > > Basically the argument I am making is that the only reason you >> > want to >> > > > call >> > > > > > flush() is to guarantee all the sends complete so if it >>doesn't >> > > > guarantee >> > > > > > that it will be somewhat confusing. This does mean blocking, >>but >> > if you >> > > > > > don't want to block on the send then you wouldn't call >>flush(). >> > > > > > >> > > > > > This has no impact on the block.on.buffer full setting. That >> > impacts >> > > > what >> > > > > > happens when send() can't append to the buffer because it is >> full. >> > > > flush() >> > > > > > means any message previously sent (i.e. for which send() call >>has >> > > > returned) >> > > > > > needs to have its request completed. Hope that makes sense. >> > > > > > >> > > > > > -Jay >> > > > > > >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry < >> > > > > > mistry.p.bhav...@gmail.com> >> > > > > > wrote: >> > > > > > >> > > > > > > HI Jay, >> > > > > > > >> > > > > > > Imagine, if you have flaky network connection to brokers, >>and >> if >> > > > flush() >> > > > > > > will be blocked if "one of broker is not available" ( >>basically >> > How >> > > > would >> > > > > > > be address failure mode and io thread not able to drain >>records >> > or >> > > > busy >> > > > > > due >> > > > > > > to pending request". Do you flush() method is only to flush >>to >> > in mem >> > > > > > queue >> > > > > > > or flush to broker over the network(). >> > > > > > > >> > > > > > > Timeout helps with and pushing caller to handle what to do >>? >> > e.g >> > > > > > > re-enqueue records, drop entire batch or one of message is >>too >> > big >> > > > cross >> > > > > > > the limit of max.message.size etc... >> > > > > > > >> > > > > > > Also, according to java doc for API "The method will block >> > until all >> > > > > > > previously sent records have completed sending (either >> > successfully >> > > > or >> > > > > > with >> > > > > > > an error)", does this by-pass rule set by for >> > block.on.buffer.full or >> > > > > > > batch.size >> > > > > > > when under load. >> > > > > > > >> > > > > > > That was my intention, and I am sorry I mixed-up close() >>method >> > here >> > > > > > > without knowing that this is only for bulk send. >> > > > > > > >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > Bhavesh >> > > > > > > >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps >><jay.kr...@gmail.com >> > >> > > > wrote: >> > > > > > > >> > > > > > > > Yeah I second the problem Guozhang flags with giving >>flush a >> > > > timeout. >> > > > > > In >> > > > > > > > general failover in Kafka is a bounded thing unless you >>have >> > > > brought >> > > > > > your >> > > > > > > > Kafka cluster down entirely so I think depending on that >> bound >> > > > > > implicitly >> > > > > > > > is okay. >> > > > > > > > >> > > > > > > > It is possible to make flush() be instead >> > > > > > > > boolean tryFlush(long timeout, TimeUnit unit); >> > > > > > > > >> > > > > > > > But I am somewhat skeptical that people will use this >> > correctly. >> > > > I.e >> > > > > > > > consider the mirror maker code snippet I gave above, how >> would >> > one >> > > > > > > actually >> > > > > > > > recover in this case other than retrying (which the client >> > already >> > > > does >> > > > > > > > automatically)? After all if you are okay losing data then >> you >> > > > don't >> > > > > > need >> > > > > > > > to bother calling flush at all, you can just let the >>messages >> > be >> > > > sent >> > > > > > > > asynchronously. >> > > > > > > > >> > > > > > > > I think close() is actually different because you may well >> > want to >> > > > > > > shutdown >> > > > > > > > immediately and just throw away unsent events. >> > > > > > > > >> > > > > > > > -Jay >> > > > > > > > >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang < >> > wangg...@gmail.com> >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > The proposal looks good to me, will need some time to >> review >> > the >> > > > > > > > > implementation RB later. >> > > > > > > > > >> > > > > > > > > Bhavesh, I am wondering how you will use a flush() with >>a >> > timeout >> > > > > > since >> > > > > > > > > such a call does not actually provide any flushing >> > guarantees? >> > > > > > > > > >> > > > > > > > > As for close(), there is a separate JIRA for this: >> > > > > > > > > >> > > > > > > > > KAFKA-1660 < >> https://issues.apache.org/jira/browse/KAFKA-1660 >> > > >> > > > > > > > > >> > > > > > > > > Guozhang >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry < >> > > > > > > > mistry.p.bhav...@gmail.com >> > > > > > > > > > >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi Jay, >> > > > > > > > > > >> > > > > > > > > > How about adding timeout for each method calls >> > > > > > > flush(timeout,TimeUnit) >> > > > > > > > > and >> > > > > > > > > > close(timeout,TimeUNIT) ? We had runway io thread >>issue >> > and >> > > > caller >> > > > > > > > > thread >> > > > > > > > > > should not blocked for ever for these methods ? >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > > > > >> > > > > > > > > > Bhavesh >> > > > > > > > > > >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps < >> > > > jay.kr...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Well actually in the case of linger.ms = 0 the send >>is >> > still >> > > > > > > > > > asynchronous >> > > > > > > > > > > so calling flush() blocks until all the previously >>sent >> > > > records >> > > > > > > have >> > > > > > > > > > > completed. It doesn't speed anything up in that >>case, >> > though, >> > > > > > since >> > > > > > > > > they >> > > > > > > > > > > are already available to send. >> > > > > > > > > > > >> > > > > > > > > > > -Jay >> > > > > > > > > > > >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira < >> > > > > > > gshap...@cloudera.com >> > > > > > > > > >> > > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Looks good to me. >> > > > > > > > > > > > >> > > > > > > > > > > > I like the idea of not blocking additional sends >>but >> > not >> > > > > > > > guaranteeing >> > > > > > > > > > > that >> > > > > > > > > > > > flush() will deliver them. >> > > > > > > > > > > > >> > > > > > > > > > > > I assume that with linger.ms = 0, flush will just >> be a >> > > > noop >> > > > > > > (since >> > > > > > > > > the >> > > > > > > > > > > > queue will be empty). Is that correct? >> > > > > > > > > > > > >> > > > > > > > > > > > Gwen >> > > > > > > > > > > > >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps < >> > > > > > jay.kr...@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Following up on our previous thread on making >>batch >> > send >> > > > a >> > > > > > > little >> > > > > > > > > > > easier, >> > > > > > > > > > > > > here is a concrete proposal to add a flush() >>method >> > to >> > > > the >> > > > > > > > > producer: >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > >> > >> >>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met >>hod+to+the+producer+API >> > > > > > > > > > > > > >> > > > > > > > > > > > > A proposed implementation is here: >> > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865 >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thoughts? >> > > > > > > > > > > > > >> > > > > > > > > > > > > -Jay >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > -- >> > > > > > > > > -- Guozhang >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > >> > > > >> > >> > >>