I'm thinking the flush call timeout will naturally be the timeout for a
produce request, No?

Currently it seems we don¹t have a timeout for client requests, should we
have one?

‹Jiangjie (Becket) Qin

On 2/16/15, 8:19 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:

>Yes, I think we all agree it would be good to add a client-side request
>timeout. That would effectively imply a flush timeout as well since any
>requests that couldn't complete in that time would be errors and hence
>completed in the definition we gave.
>
>-Jay
>
>On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
><mistry.p.bhav...@gmail.com>
>wrote:
>
>> Hi All,
>>
>> Thanks Jay and all  address concern.  I am fine with just having flush()
>> method as long as it covers failure mode and resiliency.  e.g We had
>> situation where entire Kafka cluster brokers were reachable, but upon
>> adding new kafka node and admin migrated "leader to new brokers"  that
>>new
>> brokers is NOT reachable from producer stand point due to fire wall but
>> metadata would continue to elect new broker as leader for that
>>partition.
>>
>> All I am asking is either you will have to give-up sending to this
>>broker
>> or do something in this scenario.  As for the current code 0.8.2
>>release,
>> caller thread of flush() or close() method would be blocked for ever....
>> so all I am asking is
>>
>> https://issues.apache.org/jira/browse/KAFKA-1659
>> https://issues.apache.org/jira/browse/KAFKA-1660
>>
>> Also, I recall that there is timeout also added to batch to indicate how
>> long "message" can retain in memory before expiring.
>>
>> Given,  all this should this API be consistent with others up coming
>> patches for addressing similar problem(s).
>>
>>
>> Otherwise, what we have done is spawn a thread for just calling close()
>>or
>> flush with timeout for join on caller end.
>>
>> Anyway, I just wanted to give you issues with existing API and if you
>>guys
>> think this is fine then, I am ok with this approach. It is just that
>>caller
>> will have to do bit more work.
>>
>>
>> Thanks,
>>
>> Bhavesh
>>
>> On Thursday, February 12, 2015, Joel Koshy <jjkosh...@gmail.com> wrote:
>>
>> > Yes that is a counter-example. I'm okay either way on whether we
>> > should have just flush() or have a timeout. Bhavesh, does Jay's
>> > explanation a few replies prior address your concern? If so, shall we
>> > consider this closed?
>> >
>> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
>> > > Yeah we could do that, I guess I just feel like it adds confusion
>> because
>> > > then you have to think about which timeout you want, when likely you
>> > don't
>> > > want a timeout at all.
>> > >
>> > > I guess the pattern I was thinking of was fflush or the java
>> equivalent,
>> > > which don't have timeouts:
>> > >
>> >
>> 
>>http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
>>)
>> > >
>> > > -Jay
>> > >
>> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkosh...@gmail.com>
>> > wrote:
>> > >
>> > > > I think tryFlush with a timeout sounds good to me. This is really
>> more
>> > > > for consistency than anything else. I cannot think of any standard
>> > > > blocking calls off the top of my head that don't have a timed
>> variant.
>> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
>> > > > provide an entirely non-blocking mode (e.g., socketChannel.connect
>> > > > followed by finishConnect)
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Joel
>> > > >
>> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
>> > > > > Jay,
>> > > > >
>> > > > > The .flush() call seems like it would be the best way if you
>>wanted
>> > > > to-do a
>> > > > > clean shutdown of the new producer?
>> > > > >
>> > > > > So, you could in your code "stop all incoming requests &&
>> > > > producer.flush()
>> > > > > && system.exit(value)" and know pretty much you won't drop
>>anything
>> > on
>> > > > the
>> > > > > floor.
>> > > > >
>> > > > > This can be done with the callbacks and futures (sure) but
>>.flush()
>> > seems
>> > > > > to be the right time to block and a few lines of code, no?
>> > > > >
>> > > > > ~ Joestein
>> > > > >
>> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
>><jay.kr...@gmail.com>
>> > wrote:
>> > > > >
>> > > > > > Hey Bhavesh,
>> > > > > >
>> > > > > > If a broker is not available a new one should be elected to
>>take
>> > over,
>> > > > so
>> > > > > > although the flush might take longer it should still be quick.
>> > Even if
>> > > > not
>> > > > > > this should result in an error not a hang.
>> > > > > >
>> > > > > > The cases you enumerated are all covered already--if the user
>> > wants to
>> > > > > > retry that is covered by the retry setting in the client, for
>>all
>> > the
>> > > > > > errors that is considered completion of the request. The post
>> > > > condition of
>> > > > > > flush isn't that all sends complete successfully, just that
>>they
>> > > > complete.
>> > > > > > So if you try to send a message that is too big, when flush
>> returns
>> > > > calling
>> > > > > > .get() on the future should not block and should produce the
>> error.
>> > > > > >
>> > > > > > Basically the argument I am making is that the only reason you
>> > want to
>> > > > call
>> > > > > > flush() is to guarantee all the sends complete so if it
>>doesn't
>> > > > guarantee
>> > > > > > that it will be somewhat confusing. This does mean blocking,
>>but
>> > if you
>> > > > > > don't want to block on the send then you wouldn't call
>>flush().
>> > > > > >
>> > > > > > This has no impact on the block.on.buffer full setting. That
>> > impacts
>> > > > what
>> > > > > > happens when send() can't append to the buffer because it is
>> full.
>> > > > flush()
>> > > > > > means any message previously sent (i.e. for which send() call
>>has
>> > > > returned)
>> > > > > > needs to have its request completed. Hope that makes sense.
>> > > > > >
>> > > > > > -Jay
>> > > > > >
>> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
>> > > > > > mistry.p.bhav...@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > HI Jay,
>> > > > > > >
>> > > > > > > Imagine, if you have flaky network connection to brokers,
>>and
>> if
>> > > > flush()
>> > > > > > > will be blocked if "one of broker is not available" (
>>basically
>> > How
>> > > > would
>> > > > > > > be address failure mode and io thread not able to drain
>>records
>> > or
>> > > > busy
>> > > > > > due
>> > > > > > > to pending request". Do you flush() method is only to flush
>>to
>> > in mem
>> > > > > > queue
>> > > > > > > or flush to broker over the network().
>> > > > > > >
>> > > > > > > Timeout helps with and pushing caller to handle what to do
>>?
>> > e.g
>> > > > > > > re-enqueue records, drop entire batch or one of message is
>>too
>> > big
>> > > > cross
>> > > > > > > the limit of max.message.size etc...
>> > > > > > >
>> > > > > > > Also, according to java doc for API  "The method will block
>> > until all
>> > > > > > > previously sent records have completed sending (either
>> > successfully
>> > > > or
>> > > > > > with
>> > > > > > > an error)", does this by-pass rule set by for
>> > block.on.buffer.full or
>> > > > > > > batch.size
>> > > > > > > when under load.
>> > > > > > >
>> > > > > > > That was my intention, and I am sorry I mixed-up close()
>>method
>> > here
>> > > > > > > without knowing that this is only for bulk send.
>> > > > > > >
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > >
>> > > > > > > Bhavesh
>> > > > > > >
>> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
>><jay.kr...@gmail.com
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Yeah I second the problem Guozhang flags with giving
>>flush a
>> > > > timeout.
>> > > > > > In
>> > > > > > > > general failover in Kafka is a bounded thing unless you
>>have
>> > > > brought
>> > > > > > your
>> > > > > > > > Kafka cluster down entirely so I think depending on that
>> bound
>> > > > > > implicitly
>> > > > > > > > is okay.
>> > > > > > > >
>> > > > > > > > It is possible to make flush() be instead
>> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
>> > > > > > > >
>> > > > > > > > But I am somewhat skeptical that people will use this
>> > correctly.
>> > > > I.e
>> > > > > > > > consider the mirror maker code snippet I gave above, how
>> would
>> > one
>> > > > > > > actually
>> > > > > > > > recover in this case other than retrying (which the client
>> > already
>> > > > does
>> > > > > > > > automatically)? After all if you are okay losing data then
>> you
>> > > > don't
>> > > > > > need
>> > > > > > > > to bother calling flush at all, you can just let the
>>messages
>> > be
>> > > > sent
>> > > > > > > > asynchronously.
>> > > > > > > >
>> > > > > > > > I think close() is actually different because you may well
>> > want to
>> > > > > > > shutdown
>> > > > > > > > immediately and just throw away unsent events.
>> > > > > > > >
>> > > > > > > > -Jay
>> > > > > > > >
>> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
>> > wangg...@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > The proposal looks good to me, will need some time to
>> review
>> > the
>> > > > > > > > > implementation RB later.
>> > > > > > > > >
>> > > > > > > > > Bhavesh, I am wondering how you will use a flush() with
>>a
>> > timeout
>> > > > > > since
>> > > > > > > > > such a call does not actually provide any flushing
>> > guarantees?
>> > > > > > > > >
>> > > > > > > > > As for close(), there is a separate JIRA for this:
>> > > > > > > > >
>> > > > > > > > > KAFKA-1660 <
>> https://issues.apache.org/jira/browse/KAFKA-1660
>> > >
>> > > > > > > > >
>> > > > > > > > > Guozhang
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
>> > > > > > > > mistry.p.bhav...@gmail.com
>> > > > > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Jay,
>> > > > > > > > > >
>> > > > > > > > > > How about adding timeout for each method calls
>> > > > > > > flush(timeout,TimeUnit)
>> > > > > > > > > and
>> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread
>>issue
>> > and
>> > > > caller
>> > > > > > > > > thread
>> > > > > > > > > > should not blocked for ever for these methods ?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Thanks,
>> > > > > > > > > >
>> > > > > > > > > > Bhavesh
>> > > > > > > > > >
>> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
>> > > > jay.kr...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Well actually in the case of linger.ms = 0 the send
>>is
>> > still
>> > > > > > > > > > asynchronous
>> > > > > > > > > > > so calling flush() blocks until all the previously
>>sent
>> > > > records
>> > > > > > > have
>> > > > > > > > > > > completed. It doesn't speed anything up in that
>>case,
>> > though,
>> > > > > > since
>> > > > > > > > > they
>> > > > > > > > > > > are already available to send.
>> > > > > > > > > > >
>> > > > > > > > > > > -Jay
>> > > > > > > > > > >
>> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
>> > > > > > > gshap...@cloudera.com
>> > > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Looks good to me.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I like the idea of not blocking additional sends
>>but
>> > not
>> > > > > > > > guaranteeing
>> > > > > > > > > > > that
>> > > > > > > > > > > > flush() will deliver them.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I assume that with linger.ms = 0, flush will just
>> be a
>> > > > noop
>> > > > > > > (since
>> > > > > > > > > the
>> > > > > > > > > > > > queue will be empty). Is that correct?
>> > > > > > > > > > > >
>> > > > > > > > > > > > Gwen
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
>> > > > > > jay.kr...@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Following up on our previous thread on making
>>batch
>> > send
>> > > > a
>> > > > > > > little
>> > > > > > > > > > > easier,
>> > > > > > > > > > > > > here is a concrete proposal to add a flush()
>>method
>> > to
>> > > > the
>> > > > > > > > > producer:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> >
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
>>hod+to+the+producer+API
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > A proposed implementation is here:
>> > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1865
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thoughts?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > -Jay
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > --
>> > > > > > > > > -- Guozhang
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > >
>> > > >
>> >
>> >
>>

Reply via email to