Yeah we could do that, I guess I just feel like it adds confusion because then you have to think about which timeout you want, when likely you don't want a timeout at all.
I guess the pattern I was thinking of was fflush or the java equivalent, which don't have timeouts: http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush() -Jay On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > I think tryFlush with a timeout sounds good to me. This is really more > for consistency than anything else. I cannot think of any standard > blocking calls off the top of my head that don't have a timed variant. > E.g., Thread.join, Object.wait, Future.get Either that, or they > provide an entirely non-blocking mode (e.g., socketChannel.connect > followed by finishConnect) > > Thanks, > > Joel > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote: > > Jay, > > > > The .flush() call seems like it would be the best way if you wanted > to-do a > > clean shutdown of the new producer? > > > > So, you could in your code "stop all incoming requests && > producer.flush() > > && system.exit(value)" and know pretty much you won't drop anything on > the > > floor. > > > > This can be done with the callbacks and futures (sure) but .flush() seems > > to be the right time to block and a few lines of code, no? > > > > ~ Joestein > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > > > Hey Bhavesh, > > > > > > If a broker is not available a new one should be elected to take over, > so > > > although the flush might take longer it should still be quick. Even if > not > > > this should result in an error not a hang. > > > > > > The cases you enumerated are all covered already--if the user wants to > > > retry that is covered by the retry setting in the client, for all the > > > errors that is considered completion of the request. The post > condition of > > > flush isn't that all sends complete successfully, just that they > complete. > > > So if you try to send a message that is too big, when flush returns > calling > > > .get() on the future should not block and should produce the error. > > > > > > Basically the argument I am making is that the only reason you want to > call > > > flush() is to guarantee all the sends complete so if it doesn't > guarantee > > > that it will be somewhat confusing. This does mean blocking, but if you > > > don't want to block on the send then you wouldn't call flush(). > > > > > > This has no impact on the block.on.buffer full setting. That impacts > what > > > happens when send() can't append to the buffer because it is full. > flush() > > > means any message previously sent (i.e. for which send() call has > returned) > > > needs to have its request completed. Hope that makes sense. > > > > > > -Jay > > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry < > > > mistry.p.bhav...@gmail.com> > > > wrote: > > > > > > > HI Jay, > > > > > > > > Imagine, if you have flaky network connection to brokers, and if > flush() > > > > will be blocked if "one of broker is not available" ( basically How > would > > > > be address failure mode and io thread not able to drain records or > busy > > > due > > > > to pending request". Do you flush() method is only to flush to in mem > > > queue > > > > or flush to broker over the network(). > > > > > > > > Timeout helps with and pushing caller to handle what to do ? e.g > > > > re-enqueue records, drop entire batch or one of message is too big > cross > > > > the limit of max.message.size etc... > > > > > > > > Also, according to java doc for API "The method will block until all > > > > previously sent records have completed sending (either successfully > or > > > with > > > > an error)", does this by-pass rule set by for block.on.buffer.full or > > > > batch.size > > > > when under load. > > > > > > > > That was my intention, and I am sorry I mixed-up close() method here > > > > without knowing that this is only for bulk send. > > > > > > > > > > > > Thanks, > > > > > > > > Bhavesh > > > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > > > > > > > > Yeah I second the problem Guozhang flags with giving flush a > timeout. > > > In > > > > > general failover in Kafka is a bounded thing unless you have > brought > > > your > > > > > Kafka cluster down entirely so I think depending on that bound > > > implicitly > > > > > is okay. > > > > > > > > > > It is possible to make flush() be instead > > > > > boolean tryFlush(long timeout, TimeUnit unit); > > > > > > > > > > But I am somewhat skeptical that people will use this correctly. > I.e > > > > > consider the mirror maker code snippet I gave above, how would one > > > > actually > > > > > recover in this case other than retrying (which the client already > does > > > > > automatically)? After all if you are okay losing data then you > don't > > > need > > > > > to bother calling flush at all, you can just let the messages be > sent > > > > > asynchronously. > > > > > > > > > > I think close() is actually different because you may well want to > > > > shutdown > > > > > immediately and just throw away unsent events. > > > > > > > > > > -Jay > > > > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > The proposal looks good to me, will need some time to review the > > > > > > implementation RB later. > > > > > > > > > > > > Bhavesh, I am wondering how you will use a flush() with a timeout > > > since > > > > > > such a call does not actually provide any flushing guarantees? > > > > > > > > > > > > As for close(), there is a separate JIRA for this: > > > > > > > > > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660> > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry < > > > > > mistry.p.bhav...@gmail.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Jay, > > > > > > > > > > > > > > How about adding timeout for each method calls > > > > flush(timeout,TimeUnit) > > > > > > and > > > > > > > close(timeout,TimeUNIT) ? We had runway io thread issue and > caller > > > > > > thread > > > > > > > should not blocked for ever for these methods ? > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Bhavesh > > > > > > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps < > jay.kr...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > Well actually in the case of linger.ms = 0 the send is still > > > > > > > asynchronous > > > > > > > > so calling flush() blocks until all the previously sent > records > > > > have > > > > > > > > completed. It doesn't speed anything up in that case, though, > > > since > > > > > > they > > > > > > > > are already available to send. > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira < > > > > gshap...@cloudera.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Looks good to me. > > > > > > > > > > > > > > > > > > I like the idea of not blocking additional sends but not > > > > > guaranteeing > > > > > > > > that > > > > > > > > > flush() will deliver them. > > > > > > > > > > > > > > > > > > I assume that with linger.ms = 0, flush will just be a > noop > > > > (since > > > > > > the > > > > > > > > > queue will be empty). Is that correct? > > > > > > > > > > > > > > > > > > Gwen > > > > > > > > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps < > > > jay.kr...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Following up on our previous thread on making batch send > a > > > > little > > > > > > > > easier, > > > > > > > > > > here is a concrete proposal to add a flush() method to > the > > > > > > producer: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API > > > > > > > > > > > > > > > > > > > > A proposed implementation is here: > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865 > > > > > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > >