In the scala clients we have the socket.timeout config as we are using
blocking IOs, when such timeout is reached the TimeoutException will be
thrown from the socket and the client can handle it accordingly; in the
java clients we are switching to non-blocking IOs and hence we will not
have the socket timeout any more.

I agree that we could add this client request timeout back in the java
clients, in addition to allowing client / server's non-blocking selector to
close idle sockets.

Guozhang

On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> I'm thinking the flush call timeout will naturally be the timeout for a
> produce request, No?
>
> Currently it seems we don¹t have a timeout for client requests, should we
> have one?
>
> ‹Jiangjie (Becket) Qin
>
> On 2/16/15, 8:19 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
>
> >Yes, I think we all agree it would be good to add a client-side request
> >timeout. That would effectively imply a flush timeout as well since any
> >requests that couldn't complete in that time would be errors and hence
> >completed in the definition we gave.
> >
> >-Jay
> >
> >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
> ><mistry.p.bhav...@gmail.com>
> >wrote:
> >
> >> Hi All,
> >>
> >> Thanks Jay and all  address concern.  I am fine with just having flush()
> >> method as long as it covers failure mode and resiliency.  e.g We had
> >> situation where entire Kafka cluster brokers were reachable, but upon
> >> adding new kafka node and admin migrated "leader to new brokers"  that
> >>new
> >> brokers is NOT reachable from producer stand point due to fire wall but
> >> metadata would continue to elect new broker as leader for that
> >>partition.
> >>
> >> All I am asking is either you will have to give-up sending to this
> >>broker
> >> or do something in this scenario.  As for the current code 0.8.2
> >>release,
> >> caller thread of flush() or close() method would be blocked for ever....
> >> so all I am asking is
> >>
> >> https://issues.apache.org/jira/browse/KAFKA-1659
> >> https://issues.apache.org/jira/browse/KAFKA-1660
> >>
> >> Also, I recall that there is timeout also added to batch to indicate how
> >> long "message" can retain in memory before expiring.
> >>
> >> Given,  all this should this API be consistent with others up coming
> >> patches for addressing similar problem(s).
> >>
> >>
> >> Otherwise, what we have done is spawn a thread for just calling close()
> >>or
> >> flush with timeout for join on caller end.
> >>
> >> Anyway, I just wanted to give you issues with existing API and if you
> >>guys
> >> think this is fine then, I am ok with this approach. It is just that
> >>caller
> >> will have to do bit more work.
> >>
> >>
> >> Thanks,
> >>
> >> Bhavesh
> >>
> >> On Thursday, February 12, 2015, Joel Koshy <jjkosh...@gmail.com> wrote:
> >>
> >> > Yes that is a counter-example. I'm okay either way on whether we
> >> > should have just flush() or have a timeout. Bhavesh, does Jay's
> >> > explanation a few replies prior address your concern? If so, shall we
> >> > consider this closed?
> >> >
> >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> >> > > Yeah we could do that, I guess I just feel like it adds confusion
> >> because
> >> > > then you have to think about which timeout you want, when likely you
> >> > don't
> >> > > want a timeout at all.
> >> > >
> >> > > I guess the pattern I was thinking of was fflush or the java
> >> equivalent,
> >> > > which don't have timeouts:
> >> > >
> >> >
> >>
> >>
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
> >>)
> >> > >
> >> > > -Jay
> >> > >
> >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkosh...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > I think tryFlush with a timeout sounds good to me. This is really
> >> more
> >> > > > for consistency than anything else. I cannot think of any standard
> >> > > > blocking calls off the top of my head that don't have a timed
> >> variant.
> >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
> >> > > > provide an entirely non-blocking mode (e.g., socketChannel.connect
> >> > > > followed by finishConnect)
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Joel
> >> > > >
> >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> >> > > > > Jay,
> >> > > > >
> >> > > > > The .flush() call seems like it would be the best way if you
> >>wanted
> >> > > > to-do a
> >> > > > > clean shutdown of the new producer?
> >> > > > >
> >> > > > > So, you could in your code "stop all incoming requests &&
> >> > > > producer.flush()
> >> > > > > && system.exit(value)" and know pretty much you won't drop
> >>anything
> >> > on
> >> > > > the
> >> > > > > floor.
> >> > > > >
> >> > > > > This can be done with the callbacks and futures (sure) but
> >>.flush()
> >> > seems
> >> > > > > to be the right time to block and a few lines of code, no?
> >> > > > >
> >> > > > > ~ Joestein
> >> > > > >
> >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
> >><jay.kr...@gmail.com>
> >> > wrote:
> >> > > > >
> >> > > > > > Hey Bhavesh,
> >> > > > > >
> >> > > > > > If a broker is not available a new one should be elected to
> >>take
> >> > over,
> >> > > > so
> >> > > > > > although the flush might take longer it should still be quick.
> >> > Even if
> >> > > > not
> >> > > > > > this should result in an error not a hang.
> >> > > > > >
> >> > > > > > The cases you enumerated are all covered already--if the user
> >> > wants to
> >> > > > > > retry that is covered by the retry setting in the client, for
> >>all
> >> > the
> >> > > > > > errors that is considered completion of the request. The post
> >> > > > condition of
> >> > > > > > flush isn't that all sends complete successfully, just that
> >>they
> >> > > > complete.
> >> > > > > > So if you try to send a message that is too big, when flush
> >> returns
> >> > > > calling
> >> > > > > > .get() on the future should not block and should produce the
> >> error.
> >> > > > > >
> >> > > > > > Basically the argument I am making is that the only reason you
> >> > want to
> >> > > > call
> >> > > > > > flush() is to guarantee all the sends complete so if it
> >>doesn't
> >> > > > guarantee
> >> > > > > > that it will be somewhat confusing. This does mean blocking,
> >>but
> >> > if you
> >> > > > > > don't want to block on the send then you wouldn't call
> >>flush().
> >> > > > > >
> >> > > > > > This has no impact on the block.on.buffer full setting. That
> >> > impacts
> >> > > > what
> >> > > > > > happens when send() can't append to the buffer because it is
> >> full.
> >> > > > flush()
> >> > > > > > means any message previously sent (i.e. for which send() call
> >>has
> >> > > > returned)
> >> > > > > > needs to have its request completed. Hope that makes sense.
> >> > > > > >
> >> > > > > > -Jay
> >> > > > > >
> >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> >> > > > > > mistry.p.bhav...@gmail.com>
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > HI Jay,
> >> > > > > > >
> >> > > > > > > Imagine, if you have flaky network connection to brokers,
> >>and
> >> if
> >> > > > flush()
> >> > > > > > > will be blocked if "one of broker is not available" (
> >>basically
> >> > How
> >> > > > would
> >> > > > > > > be address failure mode and io thread not able to drain
> >>records
> >> > or
> >> > > > busy
> >> > > > > > due
> >> > > > > > > to pending request". Do you flush() method is only to flush
> >>to
> >> > in mem
> >> > > > > > queue
> >> > > > > > > or flush to broker over the network().
> >> > > > > > >
> >> > > > > > > Timeout helps with and pushing caller to handle what to do
> >>?
> >> > e.g
> >> > > > > > > re-enqueue records, drop entire batch or one of message is
> >>too
> >> > big
> >> > > > cross
> >> > > > > > > the limit of max.message.size etc...
> >> > > > > > >
> >> > > > > > > Also, according to java doc for API  "The method will block
> >> > until all
> >> > > > > > > previously sent records have completed sending (either
> >> > successfully
> >> > > > or
> >> > > > > > with
> >> > > > > > > an error)", does this by-pass rule set by for
> >> > block.on.buffer.full or
> >> > > > > > > batch.size
> >> > > > > > > when under load.
> >> > > > > > >
> >> > > > > > > That was my intention, and I am sorry I mixed-up close()
> >>method
> >> > here
> >> > > > > > > without knowing that this is only for bulk send.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > >
> >> > > > > > > Bhavesh
> >> > > > > > >
> >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
> >><jay.kr...@gmail.com
> >> >
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > > > Yeah I second the problem Guozhang flags with giving
> >>flush a
> >> > > > timeout.
> >> > > > > > In
> >> > > > > > > > general failover in Kafka is a bounded thing unless you
> >>have
> >> > > > brought
> >> > > > > > your
> >> > > > > > > > Kafka cluster down entirely so I think depending on that
> >> bound
> >> > > > > > implicitly
> >> > > > > > > > is okay.
> >> > > > > > > >
> >> > > > > > > > It is possible to make flush() be instead
> >> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> >> > > > > > > >
> >> > > > > > > > But I am somewhat skeptical that people will use this
> >> > correctly.
> >> > > > I.e
> >> > > > > > > > consider the mirror maker code snippet I gave above, how
> >> would
> >> > one
> >> > > > > > > actually
> >> > > > > > > > recover in this case other than retrying (which the client
> >> > already
> >> > > > does
> >> > > > > > > > automatically)? After all if you are okay losing data then
> >> you
> >> > > > don't
> >> > > > > > need
> >> > > > > > > > to bother calling flush at all, you can just let the
> >>messages
> >> > be
> >> > > > sent
> >> > > > > > > > asynchronously.
> >> > > > > > > >
> >> > > > > > > > I think close() is actually different because you may well
> >> > want to
> >> > > > > > > shutdown
> >> > > > > > > > immediately and just throw away unsent events.
> >> > > > > > > >
> >> > > > > > > > -Jay
> >> > > > > > > >
> >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> >> > wangg...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > The proposal looks good to me, will need some time to
> >> review
> >> > the
> >> > > > > > > > > implementation RB later.
> >> > > > > > > > >
> >> > > > > > > > > Bhavesh, I am wondering how you will use a flush() with
> >>a
> >> > timeout
> >> > > > > > since
> >> > > > > > > > > such a call does not actually provide any flushing
> >> > guarantees?
> >> > > > > > > > >
> >> > > > > > > > > As for close(), there is a separate JIRA for this:
> >> > > > > > > > >
> >> > > > > > > > > KAFKA-1660 <
> >> https://issues.apache.org/jira/browse/KAFKA-1660
> >> > >
> >> > > > > > > > >
> >> > > > > > > > > Guozhang
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> >> > > > > > > > mistry.p.bhav...@gmail.com
> >> > > > > > > > > >
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi Jay,
> >> > > > > > > > > >
> >> > > > > > > > > > How about adding timeout for each method calls
> >> > > > > > > flush(timeout,TimeUnit)
> >> > > > > > > > > and
> >> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread
> >>issue
> >> > and
> >> > > > caller
> >> > > > > > > > > thread
> >> > > > > > > > > > should not blocked for ever for these methods ?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks,
> >> > > > > > > > > >
> >> > > > > > > > > > Bhavesh
> >> > > > > > > > > >
> >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> >> > > > jay.kr...@gmail.com>
> >> > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Well actually in the case of linger.ms = 0 the send
> >>is
> >> > still
> >> > > > > > > > > > asynchronous
> >> > > > > > > > > > > so calling flush() blocks until all the previously
> >>sent
> >> > > > records
> >> > > > > > > have
> >> > > > > > > > > > > completed. It doesn't speed anything up in that
> >>case,
> >> > though,
> >> > > > > > since
> >> > > > > > > > > they
> >> > > > > > > > > > > are already available to send.
> >> > > > > > > > > > >
> >> > > > > > > > > > > -Jay
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> >> > > > > > > gshap...@cloudera.com
> >> > > > > > > > >
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > Looks good to me.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I like the idea of not blocking additional sends
> >>but
> >> > not
> >> > > > > > > > guaranteeing
> >> > > > > > > > > > > that
> >> > > > > > > > > > > > flush() will deliver them.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I assume that with linger.ms = 0, flush will just
> >> be a
> >> > > > noop
> >> > > > > > > (since
> >> > > > > > > > > the
> >> > > > > > > > > > > > queue will be empty). Is that correct?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Gwen
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> >> > > > > > jay.kr...@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > Following up on our previous thread on making
> >>batch
> >> > send
> >> > > > a
> >> > > > > > > little
> >> > > > > > > > > > > easier,
> >> > > > > > > > > > > > > here is a concrete proposal to add a flush()
> >>method
> >> > to
> >> > > > the
> >> > > > > > > > > producer:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > >
> >> >
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
> >>hod+to+the+producer+API
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > A proposed implementation is here:
> >> > > > > > > > > > > > >
> https://issues.apache.org/jira/browse/KAFKA-1865
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Thoughts?
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > -Jay
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > --
> >> > > > > > > > > -- Guozhang
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > >
> >> > > >
> >> >
> >> >
> >>
>
>


-- 
-- Guozhang

Reply via email to