Hi All, Thanks Jay and all address concern. I am fine with just having flush() method as long as it covers failure mode and resiliency. e.g We had situation where entire Kafka cluster brokers were reachable, but upon adding new kafka node and admin migrated "leader to new brokers" that new brokers is NOT reachable from producer stand point due to fire wall but metadata would continue to elect new broker as leader for that partition.
All I am asking is either you will have to give-up sending to this broker or do something in this scenario. As for the current code 0.8.2 release, caller thread of flush() or close() method would be blocked for ever.... so all I am asking is https://issues.apache.org/jira/browse/KAFKA-1659 https://issues.apache.org/jira/browse/KAFKA-1660 Also, I recall that there is timeout also added to batch to indicate how long "message" can retain in memory before expiring. Given, all this should this API be consistent with others up coming patches for addressing similar problem(s). Otherwise, what we have done is spawn a thread for just calling close() or flush with timeout for join on caller end. Anyway, I just wanted to give you issues with existing API and if you guys think this is fine then, I am ok with this approach. It is just that caller will have to do bit more work. Thanks, Bhavesh On Thursday, February 12, 2015, Joel Koshy <jjkosh...@gmail.com> wrote: > Yes that is a counter-example. I'm okay either way on whether we > should have just flush() or have a timeout. Bhavesh, does Jay's > explanation a few replies prior address your concern? If so, shall we > consider this closed? > > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote: > > Yeah we could do that, I guess I just feel like it adds confusion because > > then you have to think about which timeout you want, when likely you > don't > > want a timeout at all. > > > > I guess the pattern I was thinking of was fflush or the java equivalent, > > which don't have timeouts: > > > http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush() > > > > -Jay > > > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > I think tryFlush with a timeout sounds good to me. This is really more > > > for consistency than anything else. I cannot think of any standard > > > blocking calls off the top of my head that don't have a timed variant. > > > E.g., Thread.join, Object.wait, Future.get Either that, or they > > > provide an entirely non-blocking mode (e.g., socketChannel.connect > > > followed by finishConnect) > > > > > > Thanks, > > > > > > Joel > > > > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote: > > > > Jay, > > > > > > > > The .flush() call seems like it would be the best way if you wanted > > > to-do a > > > > clean shutdown of the new producer? > > > > > > > > So, you could in your code "stop all incoming requests && > > > producer.flush() > > > > && system.exit(value)" and know pretty much you won't drop anything > on > > > the > > > > floor. > > > > > > > > This can be done with the callbacks and futures (sure) but .flush() > seems > > > > to be the right time to block and a few lines of code, no? > > > > > > > > ~ Joestein > > > > > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > > > > > > > > Hey Bhavesh, > > > > > > > > > > If a broker is not available a new one should be elected to take > over, > > > so > > > > > although the flush might take longer it should still be quick. > Even if > > > not > > > > > this should result in an error not a hang. > > > > > > > > > > The cases you enumerated are all covered already--if the user > wants to > > > > > retry that is covered by the retry setting in the client, for all > the > > > > > errors that is considered completion of the request. The post > > > condition of > > > > > flush isn't that all sends complete successfully, just that they > > > complete. > > > > > So if you try to send a message that is too big, when flush returns > > > calling > > > > > .get() on the future should not block and should produce the error. > > > > > > > > > > Basically the argument I am making is that the only reason you > want to > > > call > > > > > flush() is to guarantee all the sends complete so if it doesn't > > > guarantee > > > > > that it will be somewhat confusing. This does mean blocking, but > if you > > > > > don't want to block on the send then you wouldn't call flush(). > > > > > > > > > > This has no impact on the block.on.buffer full setting. That > impacts > > > what > > > > > happens when send() can't append to the buffer because it is full. > > > flush() > > > > > means any message previously sent (i.e. for which send() call has > > > returned) > > > > > needs to have its request completed. Hope that makes sense. > > > > > > > > > > -Jay > > > > > > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry < > > > > > mistry.p.bhav...@gmail.com> > > > > > wrote: > > > > > > > > > > > HI Jay, > > > > > > > > > > > > Imagine, if you have flaky network connection to brokers, and if > > > flush() > > > > > > will be blocked if "one of broker is not available" ( basically > How > > > would > > > > > > be address failure mode and io thread not able to drain records > or > > > busy > > > > > due > > > > > > to pending request". Do you flush() method is only to flush to > in mem > > > > > queue > > > > > > or flush to broker over the network(). > > > > > > > > > > > > Timeout helps with and pushing caller to handle what to do ? > e.g > > > > > > re-enqueue records, drop entire batch or one of message is too > big > > > cross > > > > > > the limit of max.message.size etc... > > > > > > > > > > > > Also, according to java doc for API "The method will block > until all > > > > > > previously sent records have completed sending (either > successfully > > > or > > > > > with > > > > > > an error)", does this by-pass rule set by for > block.on.buffer.full or > > > > > > batch.size > > > > > > when under load. > > > > > > > > > > > > That was my intention, and I am sorry I mixed-up close() method > here > > > > > > without knowing that this is only for bulk send. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Bhavesh > > > > > > > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps <jay.kr...@gmail.com> > > > wrote: > > > > > > > > > > > > > Yeah I second the problem Guozhang flags with giving flush a > > > timeout. > > > > > In > > > > > > > general failover in Kafka is a bounded thing unless you have > > > brought > > > > > your > > > > > > > Kafka cluster down entirely so I think depending on that bound > > > > > implicitly > > > > > > > is okay. > > > > > > > > > > > > > > It is possible to make flush() be instead > > > > > > > boolean tryFlush(long timeout, TimeUnit unit); > > > > > > > > > > > > > > But I am somewhat skeptical that people will use this > correctly. > > > I.e > > > > > > > consider the mirror maker code snippet I gave above, how would > one > > > > > > actually > > > > > > > recover in this case other than retrying (which the client > already > > > does > > > > > > > automatically)? After all if you are okay losing data then you > > > don't > > > > > need > > > > > > > to bother calling flush at all, you can just let the messages > be > > > sent > > > > > > > asynchronously. > > > > > > > > > > > > > > I think close() is actually different because you may well > want to > > > > > > shutdown > > > > > > > immediately and just throw away unsent events. > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang < > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > The proposal looks good to me, will need some time to review > the > > > > > > > > implementation RB later. > > > > > > > > > > > > > > > > Bhavesh, I am wondering how you will use a flush() with a > timeout > > > > > since > > > > > > > > such a call does not actually provide any flushing > guarantees? > > > > > > > > > > > > > > > > As for close(), there is a separate JIRA for this: > > > > > > > > > > > > > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660 > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry < > > > > > > > mistry.p.bhav...@gmail.com > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Jay, > > > > > > > > > > > > > > > > > > How about adding timeout for each method calls > > > > > > flush(timeout,TimeUnit) > > > > > > > > and > > > > > > > > > close(timeout,TimeUNIT) ? We had runway io thread issue > and > > > caller > > > > > > > > thread > > > > > > > > > should not blocked for ever for these methods ? > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Bhavesh > > > > > > > > > > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps < > > > jay.kr...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Well actually in the case of linger.ms = 0 the send is > still > > > > > > > > > asynchronous > > > > > > > > > > so calling flush() blocks until all the previously sent > > > records > > > > > > have > > > > > > > > > > completed. It doesn't speed anything up in that case, > though, > > > > > since > > > > > > > > they > > > > > > > > > > are already available to send. > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira < > > > > > > gshap...@cloudera.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Looks good to me. > > > > > > > > > > > > > > > > > > > > > > I like the idea of not blocking additional sends but > not > > > > > > > guaranteeing > > > > > > > > > > that > > > > > > > > > > > flush() will deliver them. > > > > > > > > > > > > > > > > > > > > > > I assume that with linger.ms = 0, flush will just be a > > > noop > > > > > > (since > > > > > > > > the > > > > > > > > > > > queue will be empty). Is that correct? > > > > > > > > > > > > > > > > > > > > > > Gwen > > > > > > > > > > > > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps < > > > > > jay.kr...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Following up on our previous thread on making batch > send > > > a > > > > > > little > > > > > > > > > > easier, > > > > > > > > > > > > here is a concrete proposal to add a flush() method > to > > > the > > > > > > > > producer: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API > > > > > > > > > > > > > > > > > > > > > > > > A proposed implementation is here: > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865 > > > > > > > > > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >