Thanks Matthias for the input. Sorry I was busy recently and haven't got time 
to update this thread. To summarize what we come up so far, here is a draft 
updated plan:


Introduce a new config called `member.name` which is supposed to be provided 
uniquely by the consumer client. The broker will maintain a cache with 
[key:member.name, value:member.id]. A join group request with member.name set 
will be treated as `static-membership` strategy, and will reject any join group 
request without member.name. So this coordination change will be differentiated 
from the `dynamic-membership` protocol we currently have.


When handling static join group request:

  1.   The broker will check the membership to see whether this is a new 
member. If new, broker allocate a unique member id, cache the mapping and move 
to rebalance stage.
  2.   Following 1, if this is an existing member, broker will not change group 
state, and return its cached member.id and current assignment. (unless this is 
leader, we shall trigger rebalance)
  3.   Although Guozhang has mentioned we could rejoin with pair member name 
and id, I think for join group request it is ok to leave member id blank as 
member name is the unique identifier. In commit offset request we *must* have 
both.


When handling commit offset request, if enabled with static membership, each 
time the commit request must have both member.name and member.id to be 
identified as a `certificated member`. If not, this means there are duplicate 
consumer members with same member name and the request will be rejected to 
guarantee consumption uniqueness.


When rolling restart/shutting down gracefully, the client will send a leave 
group request (static membership mode). In static membership, we will also 
define `change-group-timeout` to hold on rebalance provided by leader. So we 
will wait for all the members to rejoin the group and do exactly one rebalance 
since all members are expected to rejoin within timeout. If consumer crashes, 
the join group request from the restarted consumer will be recognized as an 
existing member and be handled as above condition 1; However, if the consumer 
takes longer than session timeout to return, we shall still trigger rebalance 
but it could still try to catch `change-group-timeout`. If it failed to catch 
second timeout, its cached state on broker will be garbage collected and 
trigger a new rebalance when it finally joins.


And consider the switch between dynamic to static membership.

  1.  Dynamic to static: the first joiner shall revise the membership to static 
and wait for all the current members to restart, since their membership is 
still dynamic. Here our assumption is that the restart process shouldn't take a 
long time, as long restart is breaking the `rebalance timeout` in whatever 
membership protocol we are using. Before restart, all dynamic member join 
requests will be rejected.
  2.  Static to dynamic: this is more like a downgrade which should be smooth: 
just erase the cached mapping, and wait for session timeout to trigger 
rebalance should be sufficient. (Fallback to current behavior)
  3.  Halfway switch: a corner case is like some clients keep dynamic 
membership while some keep static membership. This will cause the group 
rebalance forever without progress because dynamic/static states are bouncing 
each other. This could guarantee that we will not make the consumer group work 
in a wrong state by having half static and half dynamic.

To guarantee correctness, we will also push the member name/id pair to 
_consumed_offsets topic (as Matthias pointed out) and upgrade the API version, 
these details will be further discussed back in the KIP.


Are there any concern for this high level proposal? Just want to reiterate on 
the core idea of the KIP: "If the broker recognize this consumer as an existing 
member, it shouldn't trigger rebalance".

Thanks a lot for everyone's input! I feel this proposal is much more robust 
than previous one!


Best,

Boyang

________________________________
From: Matthias J. Sax <matth...@confluent.io>
Sent: Friday, August 10, 2018 2:24 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi,

thanks for the detailed discussion. I learned a lot about internals again :)

I like the idea or a user config `member.name` and to keep `member.id`
internal. Also agree with Guozhang, that reusing `client.id` might not
be a good idea.

To clarify the algorithm, each time we generate a new `member.id`, we
also need to update the "group membership" information (ie, mapping
[member.id, Assignment]), right? Ie, the new `member.id` replaces the
old entry in the cache.

I also think, we need to preserve the `member.name -> member.id` mapping
in the `__consumer_offset` topic. The KIP should mention this IMHO.

For changing the default value of config `leave.group.on.close`. I agree
with John, that we should not change the default config, because it
would impact all consumer groups with dynamic assignment. However, I
think we can document, that if static assignment is used (ie,
`member.name` is configured) we never send a LeaveGroupRequest
regardless of the config. Note, that the config is internal, so not sure
how to document this in detail. We should not expose the internal config
in the docs.

About upgrading: why do we need have two rolling bounces and encode
"static" vs "dynamic" in the JoinGroupRequest?

If we upgrade an existing consumer group from dynamic to static, I don't
see any reason why both should not work together and single rolling
bounce would not be sufficient? If we bounce the first consumer and
switch from dynamic to static, it sends a `member.name` and the broker
registers the [member.name, member.id] in the cache. Why would this
interfere with all other consumer that use dynamic assignment?

Also, Guozhang mentioned that for all other request, we need to check if
the mapping [member.name, member.id] contains the send `member.id` -- I
don't think this is necessary -- it seems to be sufficient to check the
`member.id` from the [member.id, Assignment] mapping as be do today --
thus, checking `member.id` does not require any change IMHO.


-Matthias


On 8/7/18 7:13 PM, Guozhang Wang wrote:
> @James
>
> What you described is true: the transition from dynamic to static
> memberships are not thought through yet. But I do not think it is an
> impossible problem: note that we indeed moved the offset commit from ZK to
> kafka coordinator in 0.8.2 :) The migration plan is to first to
> double-commits on both zk and coordinator, and then do a second round to
> turn the zk off.
>
> So just to throw a wild idea here: also following a two-rolling-bounce
> manner, in the JoinGroupRequest we can set the flag to "static" while keep
> the registry-id field empty still, in this case, the coordinator still
> follows the logic of "dynamic", accepting the request while allowing the
> protocol to be set to "static"; after the first rolling bounce, the group
> protocol is already "static", then a second rolling bounce is triggered and
> this time we set the registry-id.
>
>
> Guozhang
>
> On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <wushuja...@gmail.com> wrote:
>
>> Guozhang, in a previous message, you proposed said this:
>>
>>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>>
>>> 1. We bump up the JoinGroupRequest with additional fields:
>>>
>>>  1.a) a flag indicating "static" or "dynamic" membership protocols.
>>>  1.b) with "static" membership, we also add the pre-defined member id.
>>>  1.c) with "static" membership, we also add an optional
>>> "group-change-timeout" value.
>>>
>>> 2. On the broker side, we enforce only one of the two protocols for all
>>> group members: we accept the protocol on the first joined member of the
>>> group, and if later joining members indicate a different membership
>>> protocol, we reject it. If the group-change-timeout value was different
>> to
>>> the first joined member, we reject it as well.
>>
>>
>> What will happen if we have an already-deployed application that wants to
>> switch to using static membership? Let’s say there are 10 instances of it.
>> As the instances go through a rolling restart, they will switch from
>> dynamic membership (the default?) to static membership. As each one leaves
>> the group and restarts, they will be rejected from the group (because the
>> group is currently using dynamic membership). The group will shrink down
>> until there is 1 node handling all the traffic. After that one restarts,
>> the group will switch over to static membership.
>>
>> Is that right? That means that the transition plan from dynamic to static
>> membership isn’t very smooth.
>>
>> I’m not really sure what can be done in this case. This reminds me of the
>> transition plans that were discussed for moving from zookeeper-based
>> consumers to kafka-coordinator-based consumers. That was also hard, and
>> ultimately we decided not to build that.
>>
>> -James
>>
>>
>
>

Reply via email to