This sounds good to me! Thanks for the time you've spent on it, -John
On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <bche...@outlook.com> wrote: > Thanks Matthias for the input. Sorry I was busy recently and haven't got > time to update this thread. To summarize what we come up so far, here is a > draft updated plan: > > > Introduce a new config called `member.name` which is supposed to be > provided uniquely by the consumer client. The broker will maintain a cache > with [key:member.name, value:member.id]. A join group request with > member.name set will be treated as `static-membership` strategy, and will > reject any join group request without member.name. So this coordination > change will be differentiated from the `dynamic-membership` protocol we > currently have. > > > When handling static join group request: > > 1. The broker will check the membership to see whether this is a new > member. If new, broker allocate a unique member id, cache the mapping and > move to rebalance stage. > 2. Following 1, if this is an existing member, broker will not change > group state, and return its cached member.id and current assignment. > (unless this is leader, we shall trigger rebalance) > 3. Although Guozhang has mentioned we could rejoin with pair member > name and id, I think for join group request it is ok to leave member id > blank as member name is the unique identifier. In commit offset request we > *must* have both. > > > When handling commit offset request, if enabled with static membership, > each time the commit request must have both member.name and member.id to > be identified as a `certificated member`. If not, this means there are > duplicate consumer members with same member name and the request will be > rejected to guarantee consumption uniqueness. > > > When rolling restart/shutting down gracefully, the client will send a > leave group request (static membership mode). In static membership, we will > also define `change-group-timeout` to hold on rebalance provided by leader. > So we will wait for all the members to rejoin the group and do exactly one > rebalance since all members are expected to rejoin within timeout. If > consumer crashes, the join group request from the restarted consumer will > be recognized as an existing member and be handled as above condition 1; > However, if the consumer takes longer than session timeout to return, we > shall still trigger rebalance but it could still try to catch > `change-group-timeout`. If it failed to catch second timeout, its cached > state on broker will be garbage collected and trigger a new rebalance when > it finally joins. > > > And consider the switch between dynamic to static membership. > > 1. Dynamic to static: the first joiner shall revise the membership to > static and wait for all the current members to restart, since their > membership is still dynamic. Here our assumption is that the restart > process shouldn't take a long time, as long restart is breaking the > `rebalance timeout` in whatever membership protocol we are using. Before > restart, all dynamic member join requests will be rejected. > 2. Static to dynamic: this is more like a downgrade which should be > smooth: just erase the cached mapping, and wait for session timeout to > trigger rebalance should be sufficient. (Fallback to current behavior) > 3. Halfway switch: a corner case is like some clients keep dynamic > membership while some keep static membership. This will cause the group > rebalance forever without progress because dynamic/static states are > bouncing each other. This could guarantee that we will not make the > consumer group work in a wrong state by having half static and half dynamic. > > To guarantee correctness, we will also push the member name/id pair to > _consumed_offsets topic (as Matthias pointed out) and upgrade the API > version, these details will be further discussed back in the KIP. > > > Are there any concern for this high level proposal? Just want to reiterate > on the core idea of the KIP: "If the broker recognize this consumer as an > existing member, it shouldn't trigger rebalance". > > Thanks a lot for everyone's input! I feel this proposal is much more > robust than previous one! > > > Best, > > Boyang > > ________________________________ > From: Matthias J. Sax <matth...@confluent.io> > Sent: Friday, August 10, 2018 2:24 AM > To: dev@kafka.apache.org > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by > specifying member id > > Hi, > > thanks for the detailed discussion. I learned a lot about internals again > :) > > I like the idea or a user config `member.name` and to keep `member.id` > internal. Also agree with Guozhang, that reusing `client.id` might not > be a good idea. > > To clarify the algorithm, each time we generate a new `member.id`, we > also need to update the "group membership" information (ie, mapping > [member.id, Assignment]), right? Ie, the new `member.id` replaces the > old entry in the cache. > > I also think, we need to preserve the `member.name -> member.id` mapping > in the `__consumer_offset` topic. The KIP should mention this IMHO. > > For changing the default value of config `leave.group.on.close`. I agree > with John, that we should not change the default config, because it > would impact all consumer groups with dynamic assignment. However, I > think we can document, that if static assignment is used (ie, > `member.name` is configured) we never send a LeaveGroupRequest > regardless of the config. Note, that the config is internal, so not sure > how to document this in detail. We should not expose the internal config > in the docs. > > About upgrading: why do we need have two rolling bounces and encode > "static" vs "dynamic" in the JoinGroupRequest? > > If we upgrade an existing consumer group from dynamic to static, I don't > see any reason why both should not work together and single rolling > bounce would not be sufficient? If we bounce the first consumer and > switch from dynamic to static, it sends a `member.name` and the broker > registers the [member.name, member.id] in the cache. Why would this > interfere with all other consumer that use dynamic assignment? > > Also, Guozhang mentioned that for all other request, we need to check if > the mapping [member.name, member.id] contains the send `member.id` -- I > don't think this is necessary -- it seems to be sufficient to check the > `member.id` from the [member.id, Assignment] mapping as be do today -- > thus, checking `member.id` does not require any change IMHO. > > > -Matthias > > > On 8/7/18 7:13 PM, Guozhang Wang wrote: > > @James > > > > What you described is true: the transition from dynamic to static > > memberships are not thought through yet. But I do not think it is an > > impossible problem: note that we indeed moved the offset commit from ZK > to > > kafka coordinator in 0.8.2 :) The migration plan is to first to > > double-commits on both zk and coordinator, and then do a second round to > > turn the zk off. > > > > So just to throw a wild idea here: also following a two-rolling-bounce > > manner, in the JoinGroupRequest we can set the flag to "static" while > keep > > the registry-id field empty still, in this case, the coordinator still > > follows the logic of "dynamic", accepting the request while allowing the > > protocol to be set to "static"; after the first rolling bounce, the group > > protocol is already "static", then a second rolling bounce is triggered > and > > this time we set the registry-id. > > > > > > Guozhang > > > > On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <wushuja...@gmail.com> > wrote: > > > >> Guozhang, in a previous message, you proposed said this: > >> > >>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >>> > >>> 1. We bump up the JoinGroupRequest with additional fields: > >>> > >>> 1.a) a flag indicating "static" or "dynamic" membership protocols. > >>> 1.b) with "static" membership, we also add the pre-defined member id. > >>> 1.c) with "static" membership, we also add an optional > >>> "group-change-timeout" value. > >>> > >>> 2. On the broker side, we enforce only one of the two protocols for all > >>> group members: we accept the protocol on the first joined member of the > >>> group, and if later joining members indicate a different membership > >>> protocol, we reject it. If the group-change-timeout value was different > >> to > >>> the first joined member, we reject it as well. > >> > >> > >> What will happen if we have an already-deployed application that wants > to > >> switch to using static membership? Let’s say there are 10 instances of > it. > >> As the instances go through a rolling restart, they will switch from > >> dynamic membership (the default?) to static membership. As each one > leaves > >> the group and restarts, they will be rejected from the group (because > the > >> group is currently using dynamic membership). The group will shrink down > >> until there is 1 node handling all the traffic. After that one restarts, > >> the group will switch over to static membership. > >> > >> Is that right? That means that the transition plan from dynamic to > static > >> membership isn’t very smooth. > >> > >> I’m not really sure what can be done in this case. This reminds me of > the > >> transition plans that were discussed for moving from zookeeper-based > >> consumers to kafka-coordinator-based consumers. That was also hard, and > >> ultimately we decided not to build that. > >> > >> -James > >> > >> > > > > > >