> silly. But in the absense of flush there is no way to say that. As you say
> you only may that penalty on one of the get() calls, but if the linger.ms
> is high (say 60 seconds) that will be a huge penalty.

That makes sense - thanks for clarifying.

On Mon, Feb 09, 2015 at 08:11:46PM -0800, Jay Kreps wrote:
> Hey Joel,
> 
> The use case would be for something like mirror maker. You want to do
> something like the following:
> 
> while(true) {
>   val recs = consumer.poll(time);
>   for(rec <- recs)
>     producer.send(rec);
>   producer.flush();
>   consumer.commit();
> }
> 
> If you replace flush() with just calling get() on the records the problem
> is that the get call will block for linger.ms plus the time to send. But at
> the time you call flush you are actually done sending new stuff and you
> want that stuff to get sent, lingering around in case of new writes is
> silly. But in the absense of flush there is no way to say that. As you say
> you only may that penalty on one of the get() calls, but if the linger.ms
> is high (say 60 seconds) that will be a huge penalty.
> 
> -Jay
> 
> On Mon, Feb 9, 2015 at 6:23 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
> 
> > - WRT the motivation: "if you set linger.ms > 0 to encourage batching
> >   of messages, which is likely a good idea for this kind of use case,
> >   then the second for loop will block for a ms" -> however, in
> >   practice this will really only be for the first couple of calls
> >   right? Since the subsequent calls would return immediately since in
> >   all likelihood those subsequent messages would have gone out on the
> >   previous message's batch.
> > - I think Bhavesh's suggestion on the timeout makes sense for
> >   consistency (with other blocking-style calls) if nothing else.
> > - Does it make sense to fold in the API changes for KAFKA-1660 and
> >   KAFKA-1669 and do all at once?
> >
> > Thanks,
> >
> > Joel
> >
> >
> > On Mon, Feb 09, 2015 at 02:44:06PM -0800, Guozhang Wang wrote:
> > > The proposal looks good to me, will need some time to review the
> > > implementation RB later.
> > >
> > > Bhavesh, I am wondering how you will use a flush() with a timeout since
> > > such a call does not actually provide any flushing guarantees?
> > >
> > > As for close(), there is a separate JIRA for this:
> > >
> > > KAFKA-1660 <https://issues.apache.org/jira/browse/KAFKA-1660>
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > How about adding timeout for each method calls flush(timeout,TimeUnit)
> > and
> > > > close(timeout,TimeUNIT) ?  We had runway io thread issue and caller
> > thread
> > > > should not blocked for ever for these methods ?
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >
> > > > > Well actually in the case of linger.ms = 0 the send is still
> > > > asynchronous
> > > > > so calling flush() blocks until all the previously sent records have
> > > > > completed. It doesn't speed anything up in that case, though, since
> > they
> > > > > are already available to send.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <gshap...@cloudera.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Looks good to me.
> > > > > >
> > > > > > I like the idea of not blocking additional sends but not
> > guaranteeing
> > > > > that
> > > > > > flush() will deliver them.
> > > > > >
> > > > > > I assume that with linger.ms = 0, flush will just be a noop
> > (since the
> > > > > > queue will be empty). Is that correct?
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <jay.kr...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Following up on our previous thread on making batch send a little
> > > > > easier,
> > > > > > > here is a concrete proposal to add a flush() method to the
> > producer:
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> > > > > > >
> > > > > > > A proposed implementation is here:
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
> >

Reply via email to