This sounds good to me!

Thanks for the time you've spent on it,
-John

On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <bche...@outlook.com> wrote:

> Thanks Matthias for the input. Sorry I was busy recently and haven't got
> time to update this thread. To summarize what we come up so far, here is a
> draft updated plan:
>
>
> Introduce a new config called `member.name` which is supposed to be
> provided uniquely by the consumer client. The broker will maintain a cache
> with [key:member.name, value:member.id]. A join group request with
> member.name set will be treated as `static-membership` strategy, and will
> reject any join group request without member.name. So this coordination
> change will be differentiated from the `dynamic-membership` protocol we
> currently have.
>
>
> When handling static join group request:
>
>   1.   The broker will check the membership to see whether this is a new
> member. If new, broker allocate a unique member id, cache the mapping and
> move to rebalance stage.
>   2.   Following 1, if this is an existing member, broker will not change
> group state, and return its cached member.id and current assignment.
> (unless this is leader, we shall trigger rebalance)
>   3.   Although Guozhang has mentioned we could rejoin with pair member
> name and id, I think for join group request it is ok to leave member id
> blank as member name is the unique identifier. In commit offset request we
> *must* have both.
>
>
> When handling commit offset request, if enabled with static membership,
> each time the commit request must have both member.name and member.id to
> be identified as a `certificated member`. If not, this means there are
> duplicate consumer members with same member name and the request will be
> rejected to guarantee consumption uniqueness.
>
>
> When rolling restart/shutting down gracefully, the client will send a
> leave group request (static membership mode). In static membership, we will
> also define `change-group-timeout` to hold on rebalance provided by leader.
> So we will wait for all the members to rejoin the group and do exactly one
> rebalance since all members are expected to rejoin within timeout. If
> consumer crashes, the join group request from the restarted consumer will
> be recognized as an existing member and be handled as above condition 1;
> However, if the consumer takes longer than session timeout to return, we
> shall still trigger rebalance but it could still try to catch
> `change-group-timeout`. If it failed to catch second timeout, its cached
> state on broker will be garbage collected and trigger a new rebalance when
> it finally joins.
>
>
> And consider the switch between dynamic to static membership.
>
>   1.  Dynamic to static: the first joiner shall revise the membership to
> static and wait for all the current members to restart, since their
> membership is still dynamic. Here our assumption is that the restart
> process shouldn't take a long time, as long restart is breaking the
> `rebalance timeout` in whatever membership protocol we are using. Before
> restart, all dynamic member join requests will be rejected.
>   2.  Static to dynamic: this is more like a downgrade which should be
> smooth: just erase the cached mapping, and wait for session timeout to
> trigger rebalance should be sufficient. (Fallback to current behavior)
>   3.  Halfway switch: a corner case is like some clients keep dynamic
> membership while some keep static membership. This will cause the group
> rebalance forever without progress because dynamic/static states are
> bouncing each other. This could guarantee that we will not make the
> consumer group work in a wrong state by having half static and half dynamic.
>
> To guarantee correctness, we will also push the member name/id pair to
> _consumed_offsets topic (as Matthias pointed out) and upgrade the API
> version, these details will be further discussed back in the KIP.
>
>
> Are there any concern for this high level proposal? Just want to reiterate
> on the core idea of the KIP: "If the broker recognize this consumer as an
> existing member, it shouldn't trigger rebalance".
>
> Thanks a lot for everyone's input! I feel this proposal is much more
> robust than previous one!
>
>
> Best,
>
> Boyang
>
> ________________________________
> From: Matthias J. Sax <matth...@confluent.io>
> Sent: Friday, August 10, 2018 2:24 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hi,
>
> thanks for the detailed discussion. I learned a lot about internals again
> :)
>
> I like the idea or a user config `member.name` and to keep `member.id`
> internal. Also agree with Guozhang, that reusing `client.id` might not
> be a good idea.
>
> To clarify the algorithm, each time we generate a new `member.id`, we
> also need to update the "group membership" information (ie, mapping
> [member.id, Assignment]), right? Ie, the new `member.id` replaces the
> old entry in the cache.
>
> I also think, we need to preserve the `member.name -> member.id` mapping
> in the `__consumer_offset` topic. The KIP should mention this IMHO.
>
> For changing the default value of config `leave.group.on.close`. I agree
> with John, that we should not change the default config, because it
> would impact all consumer groups with dynamic assignment. However, I
> think we can document, that if static assignment is used (ie,
> `member.name` is configured) we never send a LeaveGroupRequest
> regardless of the config. Note, that the config is internal, so not sure
> how to document this in detail. We should not expose the internal config
> in the docs.
>
> About upgrading: why do we need have two rolling bounces and encode
> "static" vs "dynamic" in the JoinGroupRequest?
>
> If we upgrade an existing consumer group from dynamic to static, I don't
> see any reason why both should not work together and single rolling
> bounce would not be sufficient? If we bounce the first consumer and
> switch from dynamic to static, it sends a `member.name` and the broker
> registers the [member.name, member.id] in the cache. Why would this
> interfere with all other consumer that use dynamic assignment?
>
> Also, Guozhang mentioned that for all other request, we need to check if
> the mapping [member.name, member.id] contains the send `member.id` -- I
> don't think this is necessary -- it seems to be sufficient to check the
> `member.id` from the [member.id, Assignment] mapping as be do today --
> thus, checking `member.id` does not require any change IMHO.
>
>
> -Matthias
>
>
> On 8/7/18 7:13 PM, Guozhang Wang wrote:
> > @James
> >
> > What you described is true: the transition from dynamic to static
> > memberships are not thought through yet. But I do not think it is an
> > impossible problem: note that we indeed moved the offset commit from ZK
> to
> > kafka coordinator in 0.8.2 :) The migration plan is to first to
> > double-commits on both zk and coordinator, and then do a second round to
> > turn the zk off.
> >
> > So just to throw a wild idea here: also following a two-rolling-bounce
> > manner, in the JoinGroupRequest we can set the flag to "static" while
> keep
> > the registry-id field empty still, in this case, the coordinator still
> > follows the logic of "dynamic", accepting the request while allowing the
> > protocol to be set to "static"; after the first rolling bounce, the group
> > protocol is already "static", then a second rolling bounce is triggered
> and
> > this time we set the registry-id.
> >
> >
> > Guozhang
> >
> > On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <wushuja...@gmail.com>
> wrote:
> >
> >> Guozhang, in a previous message, you proposed said this:
> >>
> >>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> >>>
> >>> 1. We bump up the JoinGroupRequest with additional fields:
> >>>
> >>>  1.a) a flag indicating "static" or "dynamic" membership protocols.
> >>>  1.b) with "static" membership, we also add the pre-defined member id.
> >>>  1.c) with "static" membership, we also add an optional
> >>> "group-change-timeout" value.
> >>>
> >>> 2. On the broker side, we enforce only one of the two protocols for all
> >>> group members: we accept the protocol on the first joined member of the
> >>> group, and if later joining members indicate a different membership
> >>> protocol, we reject it. If the group-change-timeout value was different
> >> to
> >>> the first joined member, we reject it as well.
> >>
> >>
> >> What will happen if we have an already-deployed application that wants
> to
> >> switch to using static membership? Let’s say there are 10 instances of
> it.
> >> As the instances go through a rolling restart, they will switch from
> >> dynamic membership (the default?) to static membership. As each one
> leaves
> >> the group and restarts, they will be rejected from the group (because
> the
> >> group is currently using dynamic membership). The group will shrink down
> >> until there is 1 node handling all the traffic. After that one restarts,
> >> the group will switch over to static membership.
> >>
> >> Is that right? That means that the transition plan from dynamic to
> static
> >> membership isn’t very smooth.
> >>
> >> I’m not really sure what can be done in this case. This reminds me of
> the
> >> transition plans that were discussed for moving from zookeeper-based
> >> consumers to kafka-coordinator-based consumers. That was also hard, and
> >> ultimately we decided not to build that.
> >>
> >> -James
> >>
> >>
> >
> >
>
>

Reply via email to