Hi Jiangjie, So when you say : It is probably better to fail the messages in accumulator than keeping them in that case because I really cannot think of any case where metadata of a partition can disappear and come up again shortly, This is true * unless there is a metadata refresh that occurs during that interval right?* Thanks,
Mayuresh On Tue, May 19, 2015 at 12:26 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hey Jay, > > I think that is a very reasonable concern. > So the current behavior for those users are: > 1. Send() will go through as long as metadata is available. > 2. Send() will throw exception if the metadata of a partition is lost > after pre-initialization. > 3. The messages in accumulator will not be failed but wait until the > partition metadata is available again. > > If we reuse metadata timeout, (1) and (2) are kept the same. Only (3) is > changed as those messages will be failed immediately when the batch is > ready. > It is probably not an issue though, because user will get exception from > send() call anyway in this case. It is probably better to fail the > messages in accumulator than keeping them in that case because I really > cannot think of any case where metadata of a partition can disappear and > come up again shortly. > > So I guess the metadata timeout does exactly what it means - how long you > are willing to wait for metadata. It is not designed to provide a blocking > boundary for send() - we have blocking on buffer full as well. It is just > one of the dependencies in send() so send() could be blocked for metadata > timeout. > > I totally agree we should explain all the timeouts clearly. I think we are > fine as long as we make sure the configuration is used for what it sounds > to be used and articulate the impacts of those configurations. I’ll check > what would happen if a broker is down when we try to connect to it as well. > > Thanks. > > Jiangjie (Becket) Qin > > > On 5/19/15, 11:38 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > > >Here is the concern I had with reusing the metadata.fetch.timeout.ms: > > > >Previously people were using this as a bound on the time send() would > >block. It isn't a bound on the time we will wait on a metadata request, > >just the time the send call will block if metadata is missing for the > >topic. We told people who wanted a guarantee of no blocking to basically > >preinitialize metadata and set this timeout to 0. However I think now this > >will have a slightly different side effect which is to kill any request > >immediately for a leaderless partition even though that request is safely > >buffered in the record accumulator and no blocking will occur. People > >using > >the setting in the original way would now get a bit of a surprise. > > > >This may actually be okay and there is always a tradeoff between > >simplicity > >and control. > > > >-Jay > > > >On Tue, May 19, 2015 at 11:12 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > >> I think this looks good. What I think is missing is an overview of the > >> timeouts from the user's perspective. > >> > >> My worry is that it is quite complicated to reason about the current set > >> of timeouts. Currently we have > >> timeout.ms > >> metadata.fetch.timeout.ms > >> > >> The proposed settings I think are: > >> batch.expiration.ms > >> request.timeout.ms > >> replication.timeout.ms > >> > >> I think maybe we can skip the batch.expiration.ms. Instead maybe we can > >> somehow combine these into a single request timeout so that we subtract > >>the > >> time you spent waiting from the request timeout and/or replication > >>timeout > >> somehow? I don't have an explicit proposal but my suspicion is that from > >> the user's point of view there is just one timeout related to the > >>request > >> after which they don't care, and we can split that up between the batch > >> time and the request time. Thoughts? > >> > >> How are we handling connection timeouts? If a machine hard fails in the > >> middle of connection establishment there will be no outstanding > >>requests. I > >> think this may be okay because connections are established when we want > >>to > >> send a request and presumably we will begin the timer then? > >> > >> To that end I suggest we do two things: > >> 1. Include KAKFA-1788. I know that technically these two things are > >> different but from the user's point of view they aren't. > >> 2. Include in the KIP the explanation to the user of the full set of > >> timeouts, what they mean, how we will default them, and when to override > >> which. > >> > >> I know this is a hassle but I think the end experience will be a lot > >> better if we go through this thought process. > >> > >> -Jay > >> > >> On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin > >><j...@linkedin.com.invalid> > >> wrote: > >> > >>> I modified the WIKI page to incorporate the feedbacks from mailing list > >>> and KIP hangout. > >>> > >>> - Added the deprecation plan for TIMEOUT_CONFIG > >>> - Added the actions to take after request timeout > >>> > >>> I finally chose to create a new connection if requests timeout. The > >>>reason > >>> is: > >>> 1. In most cases, if a broker is just slow, as long as we set request > >>> timeout to be a reasonable value, we should not see many new > >>>connections > >>> get created. > >>> 2. If a broker is down, hopefully metadata refresh will find the new > >>> broker and we will not try to reconnect to the broker anymore. > >>> > >>> Comments are welcome! > >>> > >>> Thanks. > >>> > >>> Jiangjie (Becket) Qin > >>> > >>> On 5/12/15, 2:59 PM, "Mayuresh Gharat" <gharatmayures...@gmail.com> > >>> wrote: > >>> > >>> >+1 Becket. That would give enough time for clients to move. We should > >>> make > >>> >this change very clear. > >>> > > >>> >Thanks, > >>> > > >>> >Mayuresh > >>> > > >>> >On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin > >>><j...@linkedin.com.invalid > >>> > > >>> >wrote: > >>> > > >>> >> Hey Ewen, > >>> >> > >>> >> Very good summary about the compatibility. What you proposed makes > >>> >>sense. > >>> >> So basically we can do the following: > >>> >> > >>> >> In next release, i.e. 0.8.3: > >>> >> 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) > >>> >> 2. Mark TIMEOUT_CONFIG as deprecated > >>> >> 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is > >>> >> defined and give a warning about deprecation. > >>> >> In the release after 0.8.3, we remove TIMEOUT_CONFIG. > >>> >> > >>> >> This should give enough buffer for this change. > >>> >> > >>> >> Request timeout is a complete new thing we add to fix a bug, I’m > >>>with > >>> >>you > >>> >> it does not make sense to have it maintain the old buggy behavior. > >>>So > >>> we > >>> >> can set it to a reasonable value instead of infinite. > >>> >> > >>> >> Jiangjie (Becket) Qin > >>> >> > >>> >> On 5/12/15, 12:03 PM, "Ewen Cheslack-Postava" <e...@confluent.io> > >>> wrote: > >>> >> > >>> >> >I think my confusion is coming from this: > >>> >> > > >>> >> >> So in this KIP, we only address (3). The only public interface > >>> change > >>> >> >>is a > >>> >> >> new configuration of request timeout (and maybe change the > >>> >>configuration > >>> >> >> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). > >>> >> > > >>> >> >There are 3 possible compatibility issues I see here: > >>> >> > > >>> >> >* I assumed this meant the constants also change, so "timeout.ms" > >>> >>becomes > >>> >> >" > >>> >> >replication.timeout.ms". This breaks config files that worked on > >>>the > >>> >> >previous version and the only warning would be in release notes. > >>>We do > >>> >> >warn > >>> >> >about unused configs so they might notice the problem. > >>> >> > > >>> >> >* Binary and source compatibility if someone configures their > >>>client > >>> in > >>> >> >code and uses the TIMEOUT_CONFIG variable. Renaming it will cause > >>> >>existing > >>> >> >jars to break if you try to run against an updated client (which > >>>seems > >>> >>not > >>> >> >very significant since I doubt people upgrade these without > >>> recompiling > >>> >> >but > >>> >> >maybe I'm wrong about that). And it breaks builds without have > >>> >>deprecated > >>> >> >that field first, which again, is probably not the biggest issue > >>>but > >>> is > >>> >> >annoying for users and when we accidentally changed the API we > >>> >>received a > >>> >> >complaint about breaking builds. > >>> >> > > >>> >> >* Behavior compatibility as Jay mentioned on the call -- setting > >>>the > >>> >> >config > >>> >> >(even if the name changed) doesn't have the same effect it used to. > >>> >> > > >>> >> >One solution, which admittedly is more painful to implement and > >>> >>maintain, > >>> >> >would be to maintain the timeout.ms config, have it override the > >>> others > >>> >> if > >>> >> >it is specified (including an infinite request timeout I guess?), > >>>and > >>> >>if > >>> >> >it > >>> >> >isn't specified, we can just use the new config variables. Given a > >>> real > >>> >> >deprecation schedule, users would have better warning of changes > >>>and a > >>> >> >window to make the changes. > >>> >> > > >>> >> >I actually think it might not be necessary to maintain the old > >>> behavior > >>> >> >precisely, although maybe for some code it is an issue if they > >>>start > >>> >> >seeing > >>> >> >timeout exceptions that they wouldn't have seen before? > >>> >> > > >>> >> >-Ewen > >>> >> > > >>> >> >On Wed, May 6, 2015 at 6:06 PM, Jun Rao <j...@confluent.io> wrote: > >>> >> > > >>> >> >> Jiangjie, > >>> >> >> > >>> >> >> Yes, I think using metadata timeout to expire batches in the > >>>record > >>> >> >> accumulator makes sense. > >>> >> >> > >>> >> >> Thanks, > >>> >> >> > >>> >> >> Jun > >>> >> >> > >>> >> >> On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin > >>> >> >><j...@linkedin.com.invalid> > >>> >> >> wrote: > >>> >> >> > >>> >> >> > I incorporated Ewen and Guozhang’s comments in the KIP page. > >>>Want > >>> >>to > >>> >> >> speed > >>> >> >> > up on this KIP because currently we experience mirror-maker > >>>hung > >>> >>very > >>> >> >> > likely when a broker is down. > >>> >> >> > > >>> >> >> > I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used > >>> >>metadata > >>> >> >> > timeout to expire the batches which are sitting in accumulator > >>> >>without > >>> >> >> > leader info. I did that because the situation there is > >>>essentially > >>> >> >> missing > >>> >> >> > metadata. > >>> >> >> > > >>> >> >> > As a summary of what I am thinking about the timeout in new > >>> >>Producer: > >>> >> >> > > >>> >> >> > 1. Metadata timeout: > >>> >> >> > - used in send(), blocking > >>> >> >> > - used in accumulator to expire batches with timeout > >>>exception. > >>> >> >> > 2. Linger.ms > >>> >> >> > - Used in accumulator to ready the batch for drain > >>> >> >> > 3. Request timeout > >>> >> >> > - Used in NetworkClient to expire a batch and retry if no > >>> >>response > >>> >> >>is > >>> >> >> > received for a request before timeout. > >>> >> >> > > >>> >> >> > So in this KIP, we only address (3). The only public interface > >>> >>change > >>> >> >>is > >>> >> >> a > >>> >> >> > new configuration of request timeout (and maybe change the > >>> >> >>configuration > >>> >> >> > name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). > >>> >> >> > > >>> >> >> > Would like to see what people think of above approach? > >>> >> >> > > >>> >> >> > Jiangjie (Becket) Qin > >>> >> >> > > >>> >> >> > On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: > >>> >> >> > > >>> >> >> > >Jun, > >>> >> >> > > > >>> >> >> > >I thought a little bit differently on this. > >>> >> >> > >Intuitively, I am thinking that if a partition is offline, the > >>> >> >>metadata > >>> >> >> > >for that partition should be considered not ready because we > >>> don’t > >>> >> >>know > >>> >> >> > >which broker we should send the message to. So those sends > >>>need > >>> >>to be > >>> >> >> > >blocked on metadata timeout. > >>> >> >> > >Another thing I’m wondering is in which scenario an offline > >>> >>partition > >>> >> >> will > >>> >> >> > >become online again in a short period of time and how likely > >>>it > >>> >>will > >>> >> >> > >occur. My understanding is that the batch timeout for batches > >>> >> >>sitting in > >>> >> >> > >accumulator should be larger than linger.ms but should not be > >>> too > >>> >> >>long > >>> >> >> > >(e.g. less than 60 seconds). Otherwise it will exhaust the > >>>shared > >>> >> >>buffer > >>> >> >> > >with batches to be aborted. > >>> >> >> > > > >>> >> >> > >That said, I do agree it is reasonable to buffer the message > >>>for > >>> >>some > >>> >> >> time > >>> >> >> > >so messages to other partitions can still get sent. But adding > >>> >> >>another > >>> >> >> > >expiration in addition to linger.ms - which is essentially a > >>> >>timeout > >>> >> >>- > >>> >> >> > >sounds a little bit confusing. Maybe we can do this, let the > >>> batch > >>> >> >>sit > >>> >> >> in > >>> >> >> > >accumulator up to linger.ms, then fail it if necessary. > >>> >> >> > > > >>> >> >> > >What do you think? > >>> >> >> > > > >>> >> >> > >Thanks, > >>> >> >> > > > >>> >> >> > >Jiangjie (Becket) Qin > >>> >> >> > > > >>> >> >> > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote: > >>> >> >> > > > >>> >> >> > >>Jiangjie, > >>> >> >> > >> > >>> >> >> > >>Allowing messages to be accumulated in an offline partition > >>> >>could be > >>> >> >> > >>useful > >>> >> >> > >>since the partition may become available before the request > >>> >>timeout > >>> >> >>or > >>> >> >> > >>linger time is reached. Now that we are planning to add a new > >>> >> >>timeout, > >>> >> >> it > >>> >> >> > >>would be useful to think through whether/how that applies to > >>> >> >>messages > >>> >> >> in > >>> >> >> > >>the accumulator too. > >>> >> >> > >> > >>> >> >> > >>Thanks, > >>> >> >> > >> > >>> >> >> > >>Jun > >>> >> >> > >> > >>> >> >> > >> > >>> >> >> > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin > >>> >> >> <j...@linkedin.com.invalid > >>> >> >> > > > >>> >> >> > >>wrote: > >>> >> >> > >> > >>> >> >> > >>> Hi Harsha, > >>> >> >> > >>> > >>> >> >> > >>> Took a quick look at the patch. I think it is still a > >>>little > >>> >>bit > >>> >> >> > >>> different. KAFKA-1788 only handles the case where a batch > >>> >>sitting > >>> >> >>in > >>> >> >> > >>> accumulator for too long. The KIP is trying to solve the > >>>issue > >>> >> >>where > >>> >> >> a > >>> >> >> > >>> batch has already been drained from accumulator and sent to > >>> >> >>broker. > >>> >> >> > >>> We might be able to apply timeout on batch level to merge > >>> those > >>> >> >>two > >>> >> >> > >>>cases > >>> >> >> > >>> as Ewen suggested. But I’m not sure if it is a good idea to > >>> >>allow > >>> >> >> > >>>messages > >>> >> >> > >>> whose target partition is offline to sit in accumulator in > >>>the > >>> >> >>first > >>> >> >> > >>>place. > >>> >> >> > >>> > >>> >> >> > >>> Jiangjie (Becket) Qin > >>> >> >> > >>> > >>> >> >> > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" > >>> >><ka...@harsha.io> > >>> >> >> > wrote: > >>> >> >> > >>> > >>> >> >> > >>> >Guozhang and Jiangjie, > >>> >> >> > >>> > Isn’t this work being covered in > >>> >> >> > >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can > you > >>> >> please > >>> >> >> the > >>> >> >> > >>> >review the patch there. > >>> >> >> > >>> >Thanks, > >>> >> >> > >>> >Harsha > >>> >> >> > >>> > > >>> >> >> > >>> > > >>> >> >> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang > >>> >> >>(wangg...@gmail.com > >>> >> >> ) > >>> >> >> > >>> >wrote: > >>> >> >> > >>> > > >>> >> >> > >>> >Thanks for the update Jiangjie, > >>> >> >> > >>> > > >>> >> >> > >>> >I think it is actually NOT expected that hardware > >>> >>disconnection > >>> >> >>will > >>> >> >> > >>>be > >>> >> >> > >>> >detected by the selector, but rather will only be revealed > >>> >>upon > >>> >> >>TCP > >>> >> >> > >>> >timeout, which could be hours. > >>> >> >> > >>> > > >>> >> >> > >>> >A couple of comments on the wiki: > >>> >> >> > >>> > > >>> >> >> > >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we > >>> >>need > >>> >> >>the > >>> >> >> > >>> >request > >>> >> >> > >>> >timeout as implict timeout." I am not very clear what does > >>> >>this > >>> >> >> mean? > >>> >> >> > >>> > > >>> >> >> > >>> >2. Currently the producer already has a "TIMEOUT_CONFIG" > >>> which > >>> >> >> should > >>> >> >> > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to > >>> >>add " > >>> >> >> > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this > >>> renaming: > >>> >> >> > >>>admittedly > >>> >> >> > >>> > > >>> >> >> > >>> >it will change the config names but will reduce confusions > >>> >>moving > >>> >> >> > >>> >forward. > >>> >> >> > >>> > > >>> >> >> > >>> > > >>> >> >> > >>> >Guozhang > >>> >> >> > >>> > > >>> >> >> > >>> > > >>> >> >> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin > >>> >> >> > >>><j...@linkedin.com.invalid> > >>> >> >> > >>> > > >>> >> >> > >>> >wrote: > >>> >> >> > >>> > > >>> >> >> > >>> >> Checked the code again. It seems that the disconnected > >>> >>channel > >>> >> >>is > >>> >> >> > >>>not > >>> >> >> > >>> >> detected by selector as expected. > >>> >> >> > >>> >> > >>> >> >> > >>> >> Currently we are depending on the > >>> >> >> > >>> >> o.a.k.common.network.Selector.disconnected set to see > >>>if we > >>> >> >>need > >>> >> >> to > >>> >> >> > >>>do > >>> >> >> > >>> >> something for a disconnected channel. > >>> >> >> > >>> >> However Selector.disconnected set is only updated when: > >>> >> >> > >>> >> 1. A write/read/connect to channel failed. > >>> >> >> > >>> >> 2. A Key is canceled > >>> >> >> > >>> >> However when a broker is down before it sends back the > >>> >> >>response, > >>> >> >> the > >>> >> >> > >>> >> client seems not be able to detect this failure. > >>> >> >> > >>> >> > >>> >> >> > >>> >> I did a simple test below: > >>> >> >> > >>> >> 1. Run a selector on one machine and an echo server on > >>> >>another > >>> >> >> > >>>machine. > >>> >> >> > >>> >> > >>> >> >> > >>> >> Connect a selector to an echo server > >>> >> >> > >>> >> 2. Send a message to echo server using selector, then > >>>let > >>> >>the > >>> >> >> > >>>selector > >>> >> >> > >>> >> poll() every 10 seconds. > >>> >> >> > >>> >> 3. After the sever received the message, unplug cable on > >>> the > >>> >> >>echo > >>> >> >> > >>> >>server. > >>> >> >> > >>> >> 4. After waiting for 45 min. The selector still did not > >>> >> >>detected > >>> >> >> the > >>> >> >> > >>> >> network failure. > >>> >> >> > >>> >> Lsof on selector machine shows that the TCP connection > >>>is > >>> >>still > >>> >> >> > >>> >>considered > >>> >> >> > >>> >> ESTABLISHED. > >>> >> >> > >>> >> > >>> >> >> > >>> >> I’m not sure in this case what should we expect from the > >>> >> >> > >>> >> java.nio.channels.Selector. According to the document, > >>>the > >>> >> >> selector > >>> >> >> > >>> >>does > >>> >> >> > >>> >> not verify the status of the associated channel. In my > >>>test > >>> >> >>case > >>> >> >> it > >>> >> >> > >>> >>looks > >>> >> >> > >>> >> even worse that OS did not think of the socket has been > >>> >> >> > >>>disconnected. > >>> >> >> > >>> >> > >>> >> >> > >>> >> Anyway. It seems adding the client side request timeout > >>>is > >>> >> >> > >>>necessary. > >>> >> >> > >>> >>I’ve > >>> >> >> > >>> >> updated the KIP page to clarify the problem we want to > >>> solve > >>> >> >> > >>>according > >>> >> >> > >>> >>to > >>> >> >> > >>> >> Ewen’s comments. > >>> >> >> > >>> >> > >>> >> >> > >>> >> Thanks. > >>> >> >> > >>> >> > >>> >> >> > >>> >> Jiangjie (Becket) Qin > >>> >> >> > >>> >> > >>> >> >> > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" > >>> >> >><e...@confluent.io> > >>> >> >> > >>>wrote: > >>> >> >> > >>> >> > >>> >> >> > >>> >> > >>> >> >> > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin > >>> >> >> > >>> >><j...@linkedin.com.invalid> > >>> >> >> > >>> >> >wrote: > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >> Hi Ewen, thanks for the comments. Very good points! > >>> >>Please > >>> >> >>see > >>> >> >> > >>> >>replies > >>> >> >> > >>> >> >> inline. > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" < > >>> >> >> e...@confluent.io > >>> >> >> > > > >>> >> >> > >>> >> wrote: > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> >Jiangjie, > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >Great start. I have a couple of comments. > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >Under the motivation section, is it really true that > >>> the > >>> >> >> request > >>> >> >> > >>> >>will > >>> >> >> > >>> >> >> >never > >>> >> >> > >>> >> >> >be completed? Presumably if the broker goes down the > >>> >> >> connection > >>> >> >> > >>> >>will be > >>> >> >> > >>> >> >> >severed, at worst by a TCP timeout, which should > >>>clean > >>> >>up > >>> >> >>the > >>> >> >> > >>> >> >>connection > >>> >> >> > >>> >> >> >and any outstanding requests, right? I think the > >>>real > >>> >> >>reason > >>> >> >> we > >>> >> >> > >>> >>need a > >>> >> >> > >>> >> >> >different timeout is that the default TCP timeouts > >>>are > >>> >> >> > >>>ridiculously > >>> >> >> > >>> >> > >>> >> >> > >>> >> >>long > >>> >> >> > >>> >> >> >in > >>> >> >> > >>> >> >> >this context. > >>> >> >> > >>> >> >> Yes, when broker is completely down the request > >>>should > >>> be > >>> >> >> cleared > >>> >> >> > >>>as > >>> >> >> > >>> >>you > >>> >> >> > >>> >> >> said. The case we encountered looks like the broker > >>>was > >>> >>just > >>> >> >> not > >>> >> >> > >>> >> >> responding but TCP connection was still alive though. > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >Ok, that makes sense. > >>> >> >> > >>> >> > > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >My second question is about whether this is the > >>>right > >>> >> >>level to > >>> >> >> > >>> >>tackle > >>> >> >> > >>> >> >>the > >>> >> >> > >>> >> >> >issue/what user-facing changes need to be made. A > >>> >>related > >>> >> >> > >>>problem > >>> >> >> > >>> >>came > >>> >> >> > >>> >> >>up > >>> >> >> > >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 > >>> >>where > >>> >> >> > >>>producer > >>> >> >> > >>> >> >> records > >>> >> >> > >>> >> >> >get stuck indefinitely because there's no > >>>client-side > >>> >> >>timeout. > >>> >> >> > >>>This > >>> >> >> > >>> >>KIP > >>> >> >> > >>> >> >> >wouldn't fix that problem or any problems caused by > >>> >>lack of > >>> >> >> > >>> >> >>connectivity > >>> >> >> > >>> >> >> >since this would only apply to in flight requests, > >>> >>which by > >>> >> >> > >>> >>definition > >>> >> >> > >>> >> >> >must > >>> >> >> > >>> >> >> >have been sent on an active connection. > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >I suspect both types of problems probably need to be > >>> >> >>addressed > >>> >> >> > >>> >> >>separately > >>> >> >> > >>> >> >> >by introducing explicit timeouts. However, because > >>>the > >>> >> >> settings > >>> >> >> > >>> >> >>introduced > >>> >> >> > >>> >> >> >here are very much about the internal > >>>implementations > >>> of > >>> >> >>the > >>> >> >> > >>> >>clients, > >>> >> >> > >>> >> >>I'm > >>> >> >> > >>> >> >> >wondering if this even needs to be a user-facing > >>> >>setting, > >>> >> >> > >>> >>especially > >>> >> >> > >>> >> >>if we > >>> >> >> > >>> >> >> >have to add other timeouts anyway. For example, > >>>would a > >>> >> >>fixed, > >>> >> >> > >>> >>generous > >>> >> >> > >>> >> >> >value that's still much shorter than a TCP timeout, > >>>say > >>> >> >>15s, > >>> >> >> be > >>> >> >> > >>> >>good > >>> >> >> > >>> >> >> >enough? If other timeouts would allow, for example, > >>>the > >>> >> >> clients > >>> >> >> > >>>to > >>> >> >> > >>> >> >> >properly > >>> >> >> > >>> >> >> >exit even if requests have not hit their timeout, > >>>then > >>> >> >>what's > >>> >> >> > >>>the > >>> >> >> > >>> >> >>benefit > >>> >> >> > >>> >> >> >of being able to configure the request-level > >>>timeout? > >>> >> >> > >>> >> >> That is a very good point. We have three places that > >>>we > >>> >> >>might > >>> >> >> be > >>> >> >> > >>> >>able to > >>> >> >> > >>> >> >> enforce timeout for a message send: > >>> >> >> > >>> >> >> 1. Before append to accumulator - handled by metadata > >>> >> >>timeout > >>> >> >> on > >>> >> >> > >>>per > >>> >> >> > >>> >> > >>> >> >> > >>> >> >> message level. > >>> >> >> > >>> >> >> 2. Batch of messages inside accumulator - no timeout > >>> >> >>mechanism > >>> >> >> > >>>now. > >>> >> >> > >>> >> >> 3. Request of batches after messages leave the > >>> >>accumulator > >>> >> >>- we > >>> >> >> > >>>have > >>> >> >> > >>> >>a > >>> >> >> > >>> >> >> broker side timeout but no client side timeout for > >>>now. > >>> >> >> > >>> >> >> My current proposal only address (3) but not (2). > >>> >> >> > >>> >> >> Honestly I do not have a very clear idea about what > >>> >>should > >>> >> >>we > >>> >> >> do > >>> >> >> > >>> >>with > >>> >> >> > >>> >> >>(2) > >>> >> >> > >>> >> >> right now. But I am with you that we should not > >>>expose > >>> >>too > >>> >> >>many > >>> >> >> > >>> >> >> configurations to users. What I am thinking now to > >>> handle > >>> >> >>(2) > >>> >> >> is > >>> >> >> > >>> >>when > >>> >> >> > >>> >> >>user > >>> >> >> > >>> >> >> call send, if we know that a partition is offline, we > >>> >>should > >>> >> >> > >>>throw > >>> >> >> > >>> >> >> exception immediately instead of putting it into > >>> >> >>accumulator. > >>> >> >> > >>>This > >>> >> >> > >>> >>would > >>> >> >> > >>> >> >> protect further memory consumption. We might also > >>>want > >>> to > >>> >> >>fail > >>> >> >> > >>>all > >>> >> >> > >>> >>the > >>> >> >> > >>> >> >> batches in the dequeue once we found a partition is > >>> >>offline. > >>> >> >> That > >>> >> >> > >>> >> >>said, I > >>> >> >> > >>> >> >> feel timeout might not be quite applicable to (2). > >>> >> >> > >>> >> >> Do you have any suggestion on this? > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >Right, I didn't actually mean to solve 2 here, but was > >>> >>trying > >>> >> >>to > >>> >> >> > >>> >>figure > >>> >> >> > >>> >> >out > >>> >> >> > >>> >> >if a solution to 2 would reduce what we needed to do to > >>> >> >>address > >>> >> >> 3. > >>> >> >> > >>> >>(And > >>> >> >> > >>> >> >depending on how they are implemented, fixing 1 might > >>>also > >>> >> >> address > >>> >> >> > >>>2). > >>> >> >> > >>> >>It > >>> >> >> > >>> >> >sounds like you hit hang that I wasn't really > >>>expecting. > >>> >>This > >>> >> >> > >>>probably > >>> >> >> > >>> >> > >>> >> >> > >>> >> >just > >>> >> >> > >>> >> >means the KIP motivation needs to be a bit clearer > >>>about > >>> >>what > >>> >> >> type > >>> >> >> > >>>of > >>> >> >> > >>> >> >situation this addresses. The cause of the hang may > >>>also > >>> be > >>> >> >> > >>>relevant > >>> >> >> > >>> >>-- if > >>> >> >> > >>> >> >it was something like a deadlock then that's something > >>> that > >>> >> >> should > >>> >> >> > >>> >>just be > >>> >> >> > >>> >> >fixed, but if it's something outside our control then a > >>> >> >>timeout > >>> >> >> > >>>makes > >>> >> >> > >>> >>a > >>> >> >> > >>> >> >lot > >>> >> >> > >>> >> >more sense. > >>> >> >> > >>> >> > > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >I know we have a similar setting, > >>> >> >> > >>> >> >>max.in.flights.requests.per.connection, > >>> >> >> > >>> >> >> >exposed publicly (which I just discovered is missing > >>> >>from > >>> >> >>the > >>> >> >> > >>>new > >>> >> >> > >>> >> >>producer > >>> >> >> > >>> >> >> >configs documentation). But it looks like the new > >>> >>consumer > >>> >> >>is > >>> >> >> > >>>not > >>> >> >> > >>> >> >>exposing > >>> >> >> > >>> >> >> >that option, using a fixed value instead. I think we > >>> >>should > >>> >> >> > >>>default > >>> >> >> > >>> >>to > >>> >> >> > >>> >> >> >hiding these implementation values unless there's a > >>> >>strong > >>> >> >> case > >>> >> >> > >>>for > >>> >> >> > >>> >>a > >>> >> >> > >>> >> >> >scenario that requires customization. > >>> >> >> > >>> >> >> For producer, max.in.flight.requests.per.connection > >>> >>really > >>> >> >> > >>>matters. > >>> >> >> > >>> >>If > >>> >> >> > >>> >> >> people do not want to have reorder of messages, they > >>> >>have to > >>> >> >> use > >>> >> >> > >>> >> >> max.in.flight.requests.per.connection=1. On the other > >>> >>hand, > >>> >> >>if > >>> >> >> > >>> >> >>throughput > >>> >> >> > >>> >> >> is more of a concern, it could be set to higher. For > >>>the > >>> >>new > >>> >> >> > >>> >>consumer, I > >>> >> >> > >>> >> >> checked the value and I am not sure if the hard coded > >>> >> >> > >>> >> >> max.in.flight.requests.per.connection=100 is the > >>>right > >>> >> >>value. > >>> >> >> > >>> >>Without > >>> >> >> > >>> >> >>the > >>> >> >> > >>> >> >> response to the previous request, what offsets > >>>should be > >>> >>put > >>> >> >> into > >>> >> >> > >>> >>the > >>> >> >> > >>> >> >>next > >>> >> >> > >>> >> >> fetch request? It seems to me the value will be one > >>> >>natively > >>> >> >> > >>> >>regardless > >>> >> >> > >>> >> >>of > >>> >> >> > >>> >> >> the setting unless we are sending fetch request to > >>> >>different > >>> >> >> > >>> >>partitions, > >>> >> >> > >>> >> >> which does not look like the case. > >>> >> >> > >>> >> >> Anyway, it looks to be a separate issue orthogonal to > >>> the > >>> >> >> request > >>> >> >> > >>> >> >>timeout. > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> > > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> >In other words, since the only user-facing change > >>>was > >>> >>the > >>> >> >> > >>>addition > >>> >> >> > >>> >>of > >>> >> >> > >>> >> >>the > >>> >> >> > >>> >> >> >setting, I'm wondering if we can avoid the KIP > >>> >>altogether > >>> >> >>by > >>> >> >> > >>>just > >>> >> >> > >>> >> >>choosing > >>> >> >> > >>> >> >> >a good default value for the timeout. > >>> >> >> > >>> >> >> The problem is that we have a server side request > >>> timeout > >>> >> >> exposed > >>> >> >> > >>>as > >>> >> >> > >>> >>a > >>> >> >> > >>> >> >> public configuration. We cannot set the client > >>>timeout > >>> >> >>smaller > >>> >> >> > >>>than > >>> >> >> > >>> >>that > >>> >> >> > >>> >> >> value, so a hard coded value probably won¹t work > >>>here. > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >That makes sense, although it's worth keeping in mind > >>>that > >>> >> >>even > >>> >> >> if > >>> >> >> > >>>you > >>> >> >> > >>> >>use > >>> >> >> > >>> >> >"correct" values, they could still be violated due to, > >>> >>e.g., > >>> >> >>a GC > >>> >> >> > >>> >>pause > >>> >> >> > >>> >> >that causes the broker to process a request after it is > >>> >> >>supposed > >>> >> >> to > >>> >> >> > >>> >>have > >>> >> >> > >>> >> >expired. > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >-Ewen > >>> >> >> > >>> >> > > >>> >> >> > >>> >> > > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >-Ewen > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin > >>> >> >> > >>> >> >><j...@linkedin.com.invalid> > >>> >> >> > >>> >> >> >wrote: > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >> Hi, > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> >> I just created a KIP to add a request timeout to > >>> >> >> NetworkClient > >>> >> >> > >>> >>for > >>> >> >> > >>> >> >>new > >>> >> >> > >>> >> >> >> Kafka clients. > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> > >>> >> >> > >>> >> > >>> >> >> > >>> > >>> >> >> > >>> > >>> >> >> > > >>> >> > >>> >>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+reques > >>> >>>>t > >>> >> >> > >>>+ > >>> >> >> > >>> >> > >>> >> >> > >>> >> >> >>timeout+to+NetworkClient > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> >> Comments and suggestions are welcome! > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> >> Thanks. > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> >> Jiangjie (Becket) Qin > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> >> > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> > > >>> >> >> > >>> >> >> >-- > >>> >> >> > >>> >> >> >Thanks, > >>> >> >> > >>> >> >> >Ewen > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> >> > >>> >> > > >>> >> >> > >>> >> > > >>> >> >> > >>> >> >-- > >>> >> >> > >>> >> >Thanks, > >>> >> >> > >>> >> >Ewen > >>> >> >> > >>> >> > >>> >> >> > >>> >> > >>> >> >> > >>> > > >>> >> >> > >>> > > >>> >> >> > >>> >-- > >>> >> >> > >>> >-- Guozhang > >>> >> >> > >>> > >>> >> >> > >>> > >>> >> >> > > > >>> >> >> > > >>> >> >> > > >>> >> >> > >>> >> > > >>> >> > > >>> >> > > >>> >> >-- > >>> >> >Thanks, > >>> >> >Ewen > >>> >> > >>> >> > >>> > > >>> > > >>> >-- > >>> >-Regards, > >>> >Mayuresh R. Gharat > >>> >(862) 250-7125 > >>> > >>> > >> > > -- -Regards, Mayuresh R. Gharat (862) 250-7125