Hey Joel, The use case would be for something like mirror maker. You want to do something like the following:
while(true) { val recs = consumer.poll(time); for(rec <- recs) producer.send(rec); producer.flush(); consumer.commit(); } If you replace flush() with just calling get() on the records the problem is that the get call will block for linger.ms plus the time to send. But at the time you call flush you are actually done sending new stuff and you want that stuff to get sent, lingering around in case of new writes is silly. But in the absense of flush there is no way to say that. As you say you only may that penalty on one of the get() calls, but if the linger.ms is high (say 60 seconds) that will be a huge penalty. -Jay On Mon, Feb 9, 2015 at 6:23 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > - WRT the motivation: "if you set linger.ms > 0 to encourage batching > of messages, which is likely a good idea for this kind of use case, > then the second for loop will block for a ms" -> however, in > practice this will really only be for the first couple of calls > right? Since the subsequent calls would return immediately since in > all likelihood those subsequent messages would have gone out on the > previous message's batch. > - I think Bhavesh's suggestion on the timeout makes sense for > consistency (with other blocking-style calls) if nothing else. > - Does it make sense to fold in the API changes for KAFKA-1660 and > KAFKA-1669 and do all at once? > > Thanks, > > Joel > > > On Mon, Feb 09, 2015 at 02:44:06PM -0800, Guozhang Wang wrote: > > The proposal looks good to me, will need some time to review the > > implementation RB later. > > > > Bhavesh, I am wondering how you will use a flush() with a timeout since > > such a call does not actually provide any flushing guarantees? > > > > As for close(), there is a separate JIRA for this: > > > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660> > > > > Guozhang > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com> > > wrote: > > > > > Hi Jay, > > > > > > How about adding timeout for each method calls flush(timeout,TimeUnit) > and > > > close(timeout,TimeUNIT) ? We had runway io thread issue and caller > thread > > > should not blocked for ever for these methods ? > > > > > > > > > Thanks, > > > > > > Bhavesh > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > > > > > > Well actually in the case of linger.ms = 0 the send is still > > > asynchronous > > > > so calling flush() blocks until all the previously sent records have > > > > completed. It doesn't speed anything up in that case, though, since > they > > > > are already available to send. > > > > > > > > -Jay > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gshap...@cloudera.com > > > > > > wrote: > > > > > > > > > Looks good to me. > > > > > > > > > > I like the idea of not blocking additional sends but not > guaranteeing > > > > that > > > > > flush() will deliver them. > > > > > > > > > > I assume that with linger.ms = 0, flush will just be a noop > (since the > > > > > queue will be empty). Is that correct? > > > > > > > > > > Gwen > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <jay.kr...@gmail.com> > > > wrote: > > > > > > > > > > > Following up on our previous thread on making batch send a little > > > > easier, > > > > > > here is a concrete proposal to add a flush() method to the > producer: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API > > > > > > > > > > > > A proposed implementation is here: > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865 > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > >