Hey Mike, Yeah, that's a good point. A long "registration timeout" may not be a great idea. Perhaps in practice you'd set it long enough to be able to detect a failure and provision a new instance. Maybe on the order of 10 minutes is more reasonable.
In any case, it's probably a good idea to have an administrative way to force deregistration. One option is to extend the DeleteGroups API with a list of members names. -Jason On Fri, Aug 24, 2018 at 2:21 PM, Mike Freyberger <mfreyber...@appnexus.com> wrote: > Jason, > > Regarding step 4 in your proposal which suggests beginning a long timer > (30 minutes) when a static member leaves the group, would there also be the > ability for an admin to force a static membership expiration? > > I’m thinking that during particular types of outages or upgrades users > would want forcefully remove a static member from the group. > > So the user would shut the consumer down normally, which wouldn’t trigger > a rebalance. Then the user could use an admin CLI tool to force remove that > consumer from the group, so the TopicPartitions that were previously owned > by that consumer can be released. > > At a high level, we need consumer groups to gracefully handle intermittent > failures and permanent failures. Currently, the consumer group protocol > handles permanent failures well, but does not handle intermittent failures > well (it creates unnecessary rebalances). I want to make sure the overall > solution here handles both intermittent failures and permanent failures, > rather than sacrificing support for permanent failures in order to provide > support for intermittent failures. > > Mike > > Sent from my iPhone > > > On Aug 24, 2018, at 3:03 PM, Jason Gustafson <ja...@confluent.io> wrote: > > > > Hey Guozhang, > > > > Responses below: > > > > Originally I was trying to kill more birds with one stone with KIP-345, > >> e.g. to fix the multi-rebalance issue on starting up / shutting down a > >> multi-instance client (mentioned as case 1)/2) in my early email), and > >> hence proposing to have a pure static-membership protocol. But thinking > >> twice about it I now feel it may be too ambitious and worth fixing in > >> another KIP. > > > > > > I was considering an extension to support pre-initialization of the > static > > members of the group, but I agree we should probably leave this problem > for > > future work. > > > > 1. How this longish static member expiration timeout defined? Is it via a > >> broker, hence global config, or via a client config which can be > >> communicated to broker via JoinGroupRequest? > > > > > > I am not too sure. I tend to lean toward server-side configs because they > > are easier to evolve. If we have to add something to the protocol, then > > we'll be stuck with it forever. > > > > 2. Assuming that for static members, LEAVE_GROUP request will not > trigger a > >> rebalance immediately either, similar to session timeout, but only the > >> longer member expiration timeout, can we remove the internal " > >> internal.leave.group.on.close" config, which is a quick walk-around > then? > > > > > > Yeah, I hope we can ultimately get rid of it, but we may need it for > > compatibility with older brokers. A related question is what should be > the > > behavior of the consumer if `member.name` is provided but the broker > does > > not support it? We could either fail or silently downgrade to dynamic > > membership. > > > > -Jason > > > > > >> On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >> Hey Jason, > >> > >> I like your idea to simplify the upgrade protocol to allow co-exist of > >> static and dynamic members. Admittedly it may make the coordinator-side > >> logic a bit more complex, but I think it worth doing it. > >> > >> Originally I was trying to kill more birds with one stone with KIP-345, > >> e.g. to fix the multi-rebalance issue on starting up / shutting down a > >> multi-instance client (mentioned as case 1)/2) in my early email), and > >> hence proposing to have a pure static-membership protocol. But thinking > >> twice about it I now feel it may be too ambitious and worth fixing in > >> another KIP. With that, I think what you've proposed here is a good way > to > >> go for KIP-345 itself. > >> > >> Note there are a few details in your proposal we'd still need to figure > >> out: > >> > >> 1. How this longish static member expiration timeout defined? Is it via > a > >> broker, hence global config, or via a client config which can be > >> communicated to broker via JoinGroupRequest? > >> > >> 2. Assuming that for static members, LEAVE_GROUP request will not > trigger a > >> rebalance immediately either, similar to session timeout, but only the > >> longer member expiration timeout, can we remove the internal " > >> internal.leave.group.on.close" config, which is a quick walk-around > then? > >> > >> > >> > >> Guozhang > >> > >> > >> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson <ja...@confluent.io> > >> wrote: > >> > >>> Hey All, > >>> > >>> Nice to see some solid progress on this. It sounds like one of the > >>> complications is allowing static and dynamic registration to coexist. > I'm > >>> wondering if we can do something like the following: > >>> > >>> 1. Statically registered members (those joining the group with a > >> non-null ` > >>> member.name`) maintain a session with the coordinator just like > dynamic > >>> members. > >>> 2. If a session is active for a static member when a rebalance begins, > >> then > >>> basically we'll keep the current behavior. The rebalance will await the > >>> static member joining the group. > >>> 3. If a static member does not have an active session, then the > >> coordinator > >>> will not wait for it to join, but will still include it in the > rebalance. > >>> The coordinator will forward the cached subscription information to the > >>> leader and will cache the assignment after the rebalance completes. > (Note > >>> that we still have the generationId to fence offset commits from a > static > >>> zombie if the assignment changes.) > >>> 4. When a static member leaves the group or has its session expire, no > >>> rebalance is triggered. Instead, we can begin a timer to expire the > >> static > >>> registration. This would be a longish timeout (like 30 minutes say). > >>> > >>> So basically static members participate in all rebalances regardless > >>> whether they have an active session. In a given rebalance, some of the > >>> members may be static and some dynamic. The group leader can > >> differentiate > >>> the two based on the presence of the `member.name` (we have to add > this > >> to > >>> the JoinGroupResponse). Generally speaking, we would choose leaders > >>> preferentially from the active members that support the latest > JoinGroup > >>> protocol and are using static membership. If we have to choose a leader > >>> with an old version, however, it would see all members in the group > >> (static > >>> or dynamic) as dynamic members and perform the assignment as usual. > >>> > >>> Would that work? > >>> > >>> -Jason > >>> > >>> > >>> On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>> > >>>> Hello Boyang, > >>>> > >>>> Thanks for the updated proposal, a few questions: > >>>> > >>>> 1. Where will "change-group-timeout" be communicated to the broker? > >> Will > >>>> that be a new field in the JoinGroupRequest, or are we going to > >>> piggy-back > >>>> on the existing session-timeout field (assuming that the original > value > >>>> will not be used anywhere in the static membership any more)? > >>>> > >>>> 2. "However, if the consumer takes longer than session timeout to > >> return, > >>>> we shall still trigger rebalance but it could still try to catch > >>>> `change-group-timeout`.": what does this mean? I thought your proposal > >> is > >>>> that for static memberships, the broker will NOT trigger rebalance > even > >>>> after session-timeout has been detected, but only that after > >>>> change-group-timeout > >>>> which is supposed to be longer than session-timeout to be defined? > >>>> > >>>> 3. "A join group request with member.name set will be treated as > >>>> `static-membership` strategy", in this case, how would the switch from > >>>> dynamic to static happen, since whoever changed the member.name to > >>>> not-null > >>>> will be rejected, right? > >>>> > >>>> 4. "just erase the cached mapping, and wait for session timeout to > >>> trigger > >>>> rebalance should be sufficient." this is also a bit unclear to me: who > >>> will > >>>> erase the cached mapping? Since it is on the broker-side I assume that > >>>> broker has to do it. Are you suggesting to use a new request for it? > >>>> > >>>> 5. "Halfway switch": following 3) above, if your proposal is basically > >> to > >>>> let "first join-request wins", and the strategy will stay as is until > >> all > >>>> members are gone, then this will also not happen since whoever used > >>>> different strategy as the first guy who sends join-group request will > >> be > >>>> rejected right? > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> On Tue, Aug 21, 2018 at 9:28 AM, John Roesler <j...@confluent.io> > >> wrote: > >>>> > >>>>> This sounds good to me! > >>>>> > >>>>> Thanks for the time you've spent on it, > >>>>> -John > >>>>> > >>>>> On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <bche...@outlook.com> > >>>> wrote: > >>>>> > >>>>>> Thanks Matthias for the input. Sorry I was busy recently and > >> haven't > >>>> got > >>>>>> time to update this thread. To summarize what we come up so far, > >> here > >>>> is > >>>>> a > >>>>>> draft updated plan: > >>>>>> > >>>>>> > >>>>>> Introduce a new config called `member.name` which is supposed to > >> be > >>>>>> provided uniquely by the consumer client. The broker will maintain > >> a > >>>>> cache > >>>>>> with [key:member.name, value:member.id]. A join group request with > >>>>>> member.name set will be treated as `static-membership` strategy, > >> and > >>>>> will > >>>>>> reject any join group request without member.name. So this > >>>> coordination > >>>>>> change will be differentiated from the `dynamic-membership` > >> protocol > >>> we > >>>>>> currently have. > >>>>>> > >>>>>> > >>>>>> When handling static join group request: > >>>>>> > >>>>>> 1. The broker will check the membership to see whether this is > >> a > >>>> new > >>>>>> member. If new, broker allocate a unique member id, cache the > >> mapping > >>>> and > >>>>>> move to rebalance stage. > >>>>>> 2. Following 1, if this is an existing member, broker will not > >>>> change > >>>>>> group state, and return its cached member.id and current > >> assignment. > >>>>>> (unless this is leader, we shall trigger rebalance) > >>>>>> 3. Although Guozhang has mentioned we could rejoin with pair > >>> member > >>>>>> name and id, I think for join group request it is ok to leave > >> member > >>> id > >>>>>> blank as member name is the unique identifier. In commit offset > >>> request > >>>>> we > >>>>>> *must* have both. > >>>>>> > >>>>>> > >>>>>> When handling commit offset request, if enabled with static > >>> membership, > >>>>>> each time the commit request must have both member.name and > >>> member.id > >>>> to > >>>>>> be identified as a `certificated member`. If not, this means there > >>> are > >>>>>> duplicate consumer members with same member name and the request > >> will > >>>> be > >>>>>> rejected to guarantee consumption uniqueness. > >>>>>> > >>>>>> > >>>>>> When rolling restart/shutting down gracefully, the client will > >> send a > >>>>>> leave group request (static membership mode). In static membership, > >>> we > >>>>> will > >>>>>> also define `change-group-timeout` to hold on rebalance provided by > >>>>> leader. > >>>>>> So we will wait for all the members to rejoin the group and do > >>> exactly > >>>>> one > >>>>>> rebalance since all members are expected to rejoin within timeout. > >> If > >>>>>> consumer crashes, the join group request from the restarted > >> consumer > >>>> will > >>>>>> be recognized as an existing member and be handled as above > >> condition > >>>> 1; > >>>>>> However, if the consumer takes longer than session timeout to > >> return, > >>>> we > >>>>>> shall still trigger rebalance but it could still try to catch > >>>>>> `change-group-timeout`. If it failed to catch second timeout, its > >>>> cached > >>>>>> state on broker will be garbage collected and trigger a new > >> rebalance > >>>>> when > >>>>>> it finally joins. > >>>>>> > >>>>>> > >>>>>> And consider the switch between dynamic to static membership. > >>>>>> > >>>>>> 1. Dynamic to static: the first joiner shall revise the > >> membership > >>>> to > >>>>>> static and wait for all the current members to restart, since their > >>>>>> membership is still dynamic. Here our assumption is that the > >> restart > >>>>>> process shouldn't take a long time, as long restart is breaking the > >>>>>> `rebalance timeout` in whatever membership protocol we are using. > >>>> Before > >>>>>> restart, all dynamic member join requests will be rejected. > >>>>>> 2. Static to dynamic: this is more like a downgrade which should > >>> be > >>>>>> smooth: just erase the cached mapping, and wait for session timeout > >>> to > >>>>>> trigger rebalance should be sufficient. (Fallback to current > >>> behavior) > >>>>>> 3. Halfway switch: a corner case is like some clients keep > >> dynamic > >>>>>> membership while some keep static membership. This will cause the > >>> group > >>>>>> rebalance forever without progress because dynamic/static states > >> are > >>>>>> bouncing each other. This could guarantee that we will not make the > >>>>>> consumer group work in a wrong state by having half static and half > >>>>> dynamic. > >>>>>> > >>>>>> To guarantee correctness, we will also push the member name/id pair > >>> to > >>>>>> _consumed_offsets topic (as Matthias pointed out) and upgrade the > >> API > >>>>>> version, these details will be further discussed back in the KIP. > >>>>>> > >>>>>> > >>>>>> Are there any concern for this high level proposal? Just want to > >>>>> reiterate > >>>>>> on the core idea of the KIP: "If the broker recognize this consumer > >>> as > >>>> an > >>>>>> existing member, it shouldn't trigger rebalance". > >>>>>> > >>>>>> Thanks a lot for everyone's input! I feel this proposal is much > >> more > >>>>>> robust than previous one! > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Boyang > >>>>>> > >>>>>> ________________________________ > >>>>>> From: Matthias J. Sax <matth...@confluent.io> > >>>>>> Sent: Friday, August 10, 2018 2:24 AM > >>>>>> To: dev@kafka.apache.org > >>>>>> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances > >>> by > >>>>>> specifying member id > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> thanks for the detailed discussion. I learned a lot about internals > >>>> again > >>>>>> :) > >>>>>> > >>>>>> I like the idea or a user config `member.name` and to keep ` > >>> member.id` > >>>>>> internal. Also agree with Guozhang, that reusing `client.id` might > >>> not > >>>>>> be a good idea. > >>>>>> > >>>>>> To clarify the algorithm, each time we generate a new `member.id`, > >>> we > >>>>>> also need to update the "group membership" information (ie, mapping > >>>>>> [member.id, Assignment]), right? Ie, the new `member.id` replaces > >>> the > >>>>>> old entry in the cache. > >>>>>> > >>>>>> I also think, we need to preserve the `member.name -> member.id` > >>>> mapping > >>>>>> in the `__consumer_offset` topic. The KIP should mention this IMHO. > >>>>>> > >>>>>> For changing the default value of config `leave.group.on.close`. I > >>>> agree > >>>>>> with John, that we should not change the default config, because it > >>>>>> would impact all consumer groups with dynamic assignment. However, > >> I > >>>>>> think we can document, that if static assignment is used (ie, > >>>>>> `member.name` is configured) we never send a LeaveGroupRequest > >>>>>> regardless of the config. Note, that the config is internal, so not > >>>> sure > >>>>>> how to document this in detail. We should not expose the internal > >>>> config > >>>>>> in the docs. > >>>>>> > >>>>>> About upgrading: why do we need have two rolling bounces and encode > >>>>>> "static" vs "dynamic" in the JoinGroupRequest? > >>>>>> > >>>>>> If we upgrade an existing consumer group from dynamic to static, I > >>>> don't > >>>>>> see any reason why both should not work together and single rolling > >>>>>> bounce would not be sufficient? If we bounce the first consumer and > >>>>>> switch from dynamic to static, it sends a `member.name` and the > >>> broker > >>>>>> registers the [member.name, member.id] in the cache. Why would > >> this > >>>>>> interfere with all other consumer that use dynamic assignment? > >>>>>> > >>>>>> Also, Guozhang mentioned that for all other request, we need to > >> check > >>>> if > >>>>>> the mapping [member.name, member.id] contains the send `member.id` > >>> -- > >>>> I > >>>>>> don't think this is necessary -- it seems to be sufficient to check > >>> the > >>>>>> `member.id` from the [member.id, Assignment] mapping as be do > >> today > >>> -- > >>>>>> thus, checking `member.id` does not require any change IMHO. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>>> On 8/7/18 7:13 PM, Guozhang Wang wrote: > >>>>>>> @James > >>>>>>> > >>>>>>> What you described is true: the transition from dynamic to static > >>>>>>> memberships are not thought through yet. But I do not think it is > >>> an > >>>>>>> impossible problem: note that we indeed moved the offset commit > >>> from > >>>> ZK > >>>>>> to > >>>>>>> kafka coordinator in 0.8.2 :) The migration plan is to first to > >>>>>>> double-commits on both zk and coordinator, and then do a second > >>> round > >>>>> to > >>>>>>> turn the zk off. > >>>>>>> > >>>>>>> So just to throw a wild idea here: also following a > >>>> two-rolling-bounce > >>>>>>> manner, in the JoinGroupRequest we can set the flag to "static" > >>> while > >>>>>> keep > >>>>>>> the registry-id field empty still, in this case, the coordinator > >>>> still > >>>>>>> follows the logic of "dynamic", accepting the request while > >>> allowing > >>>>> the > >>>>>>> protocol to be set to "static"; after the first rolling bounce, > >> the > >>>>> group > >>>>>>> protocol is already "static", then a second rolling bounce is > >>>> triggered > >>>>>> and > >>>>>>> this time we set the registry-id. > >>>>>>> > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> On Tue, Aug 7, 2018 at 1:19 AM, James Cheng < > >> wushuja...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Guozhang, in a previous message, you proposed said this: > >>>>>>>> > >>>>>>>>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangg...@gmail.com > >>> > >>>>> wrote: > >>>>>>>>> > >>>>>>>>> 1. We bump up the JoinGroupRequest with additional fields: > >>>>>>>>> > >>>>>>>>> 1.a) a flag indicating "static" or "dynamic" membership > >>> protocols. > >>>>>>>>> 1.b) with "static" membership, we also add the pre-defined > >>> member > >>>>> id. > >>>>>>>>> 1.c) with "static" membership, we also add an optional > >>>>>>>>> "group-change-timeout" value. > >>>>>>>>> > >>>>>>>>> 2. On the broker side, we enforce only one of the two protocols > >>> for > >>>>> all > >>>>>>>>> group members: we accept the protocol on the first joined > >> member > >>> of > >>>>> the > >>>>>>>>> group, and if later joining members indicate a different > >>> membership > >>>>>>>>> protocol, we reject it. If the group-change-timeout value was > >>>>> different > >>>>>>>> to > >>>>>>>>> the first joined member, we reject it as well. > >>>>>>>> > >>>>>>>> > >>>>>>>> What will happen if we have an already-deployed application that > >>>> wants > >>>>>> to > >>>>>>>> switch to using static membership? Let’s say there are 10 > >>> instances > >>>> of > >>>>>> it. > >>>>>>>> As the instances go through a rolling restart, they will switch > >>> from > >>>>>>>> dynamic membership (the default?) to static membership. As each > >>> one > >>>>>> leaves > >>>>>>>> the group and restarts, they will be rejected from the group > >>>> (because > >>>>>> the > >>>>>>>> group is currently using dynamic membership). The group will > >>> shrink > >>>>> down > >>>>>>>> until there is 1 node handling all the traffic. After that one > >>>>> restarts, > >>>>>>>> the group will switch over to static membership. > >>>>>>>> > >>>>>>>> Is that right? That means that the transition plan from dynamic > >> to > >>>>>> static > >>>>>>>> membership isn’t very smooth. > >>>>>>>> > >>>>>>>> I’m not really sure what can be done in this case. This reminds > >> me > >>>> of > >>>>>> the > >>>>>>>> transition plans that were discussed for moving from > >>> zookeeper-based > >>>>>>>> consumers to kafka-coordinator-based consumers. That was also > >>> hard, > >>>>> and > >>>>>>>> ultimately we decided not to build that. > >>>>>>>> > >>>>>>>> -James > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> -- Guozhang > >>>> > >>> > >> > >> > >> > >> -- > >> -- Guozhang > >> >