Hey Mike,

Yeah, that's a good point. A long "registration timeout" may not be a great
idea. Perhaps in practice you'd set it long enough to be able to detect a
failure and provision a new instance. Maybe on the order of 10 minutes is
more reasonable.

In any case, it's probably a good idea to have an administrative way to
force deregistration. One option is to extend the DeleteGroups API with a
list of members names.

-Jason



On Fri, Aug 24, 2018 at 2:21 PM, Mike Freyberger <mfreyber...@appnexus.com>
wrote:

> Jason,
>
> Regarding step 4 in your proposal which suggests beginning a long timer
> (30 minutes) when a static member leaves the group, would there also be the
> ability for an admin to force a static membership expiration?
>
> I’m thinking that during particular types of outages or upgrades users
> would want forcefully remove a static member from the group.
>
> So the user would shut the consumer down normally, which wouldn’t trigger
> a rebalance. Then the user could use an admin CLI tool to force remove that
> consumer from the group, so the TopicPartitions that were previously owned
> by that consumer can be released.
>
> At a high level, we need consumer groups to gracefully handle intermittent
> failures and permanent failures. Currently, the consumer group protocol
> handles permanent failures well, but does not handle intermittent failures
> well (it creates unnecessary rebalances). I want to make sure the overall
> solution here handles both intermittent failures and permanent failures,
> rather than sacrificing support for permanent failures in order to provide
> support for intermittent failures.
>
> Mike
>
> Sent from my iPhone
>
> > On Aug 24, 2018, at 3:03 PM, Jason Gustafson <ja...@confluent.io> wrote:
> >
> > Hey Guozhang,
> >
> > Responses below:
> >
> > Originally I was trying to kill more birds with one stone with KIP-345,
> >> e.g. to fix the multi-rebalance issue on starting up / shutting down a
> >> multi-instance client (mentioned as case 1)/2) in my early email), and
> >> hence proposing to have a pure static-membership protocol. But thinking
> >> twice about it I now feel it may be too ambitious and worth fixing in
> >> another KIP.
> >
> >
> > I was considering an extension to support pre-initialization of the
> static
> > members of the group, but I agree we should probably leave this problem
> for
> > future work.
> >
> > 1. How this longish static member expiration timeout defined? Is it via a
> >> broker, hence global config, or via a client config which can be
> >> communicated to broker via JoinGroupRequest?
> >
> >
> > I am not too sure. I tend to lean toward server-side configs because they
> > are easier to evolve. If we have to add something to the protocol, then
> > we'll be stuck with it forever.
> >
> > 2. Assuming that for static members, LEAVE_GROUP request will not
> trigger a
> >> rebalance immediately either, similar to session timeout, but only the
> >> longer member expiration timeout, can we remove the internal "
> >> internal.leave.group.on.close" config, which is a quick walk-around
> then?
> >
> >
> > Yeah, I hope we can ultimately get rid of it, but we may need it for
> > compatibility with older brokers. A related question is what should be
> the
> > behavior of the consumer if `member.name` is provided but the broker
> does
> > not support it? We could either fail or silently downgrade to dynamic
> > membership.
> >
> > -Jason
> >
> >
> >> On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> Hey Jason,
> >>
> >> I like your idea to simplify the upgrade protocol to allow co-exist of
> >> static and dynamic members. Admittedly it may make the coordinator-side
> >> logic a bit more complex, but I think it worth doing it.
> >>
> >> Originally I was trying to kill more birds with one stone with KIP-345,
> >> e.g. to fix the multi-rebalance issue on starting up / shutting down a
> >> multi-instance client (mentioned as case 1)/2) in my early email), and
> >> hence proposing to have a pure static-membership protocol. But thinking
> >> twice about it I now feel it may be too ambitious and worth fixing in
> >> another KIP. With that, I think what you've proposed here is a good way
> to
> >> go for KIP-345 itself.
> >>
> >> Note there are a few details in your proposal we'd still need to figure
> >> out:
> >>
> >> 1. How this longish static member expiration timeout defined? Is it via
> a
> >> broker, hence global config, or via a client config which can be
> >> communicated to broker via JoinGroupRequest?
> >>
> >> 2. Assuming that for static members, LEAVE_GROUP request will not
> trigger a
> >> rebalance immediately either, similar to session timeout, but only the
> >> longer member expiration timeout, can we remove the internal "
> >> internal.leave.group.on.close" config, which is a quick walk-around
> then?
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >>
> >>> Hey All,
> >>>
> >>> Nice to see some solid progress on this. It sounds like one of the
> >>> complications is allowing static and dynamic registration to coexist.
> I'm
> >>> wondering if we can do something like the following:
> >>>
> >>> 1. Statically registered members (those joining the group with a
> >> non-null `
> >>> member.name`) maintain a session with the coordinator just like
> dynamic
> >>> members.
> >>> 2. If a session is active for a static member when a rebalance begins,
> >> then
> >>> basically we'll keep the current behavior. The rebalance will await the
> >>> static member joining the group.
> >>> 3. If a static member does not have an active session, then the
> >> coordinator
> >>> will not wait for it to join, but will still include it in the
> rebalance.
> >>> The coordinator will forward the cached subscription information to the
> >>> leader and will cache the assignment after the rebalance completes.
> (Note
> >>> that we still have the generationId to fence offset commits from a
> static
> >>> zombie if the assignment changes.)
> >>> 4. When a static member leaves the group or has its session expire, no
> >>> rebalance is triggered. Instead, we can begin a timer to expire the
> >> static
> >>> registration. This would be a longish timeout (like 30 minutes say).
> >>>
> >>> So basically static members participate in all rebalances regardless
> >>> whether they have an active session. In a given rebalance, some of the
> >>> members may be static and some dynamic. The group leader can
> >> differentiate
> >>> the two based on the presence of the `member.name` (we have to add
> this
> >> to
> >>> the JoinGroupResponse). Generally speaking, we would choose leaders
> >>> preferentially from the active members that support the latest
> JoinGroup
> >>> protocol and are using static membership. If we have to choose a leader
> >>> with an old version, however, it would see all members in the group
> >> (static
> >>> or dynamic) as dynamic members and perform the assignment as usual.
> >>>
> >>> Would that work?
> >>>
> >>> -Jason
> >>>
> >>>
> >>> On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>
> >>>> Hello Boyang,
> >>>>
> >>>> Thanks for the updated proposal, a few questions:
> >>>>
> >>>> 1. Where will "change-group-timeout" be communicated to the broker?
> >> Will
> >>>> that be a new field in the JoinGroupRequest, or are we going to
> >>> piggy-back
> >>>> on the existing session-timeout field (assuming that the original
> value
> >>>> will not be used anywhere in the static membership any more)?
> >>>>
> >>>> 2. "However, if the consumer takes longer than session timeout to
> >> return,
> >>>> we shall still trigger rebalance but it could still try to catch
> >>>> `change-group-timeout`.": what does this mean? I thought your proposal
> >> is
> >>>> that for static memberships, the broker will NOT trigger rebalance
> even
> >>>> after session-timeout has been detected, but only that after
> >>>> change-group-timeout
> >>>> which is supposed to be longer than session-timeout to be defined?
> >>>>
> >>>> 3. "A join group request with member.name set will be treated as
> >>>> `static-membership` strategy", in this case, how would the switch from
> >>>> dynamic to static happen, since whoever changed the member.name to
> >>>> not-null
> >>>> will be rejected, right?
> >>>>
> >>>> 4. "just erase the cached mapping, and wait for session timeout to
> >>> trigger
> >>>> rebalance should be sufficient." this is also a bit unclear to me: who
> >>> will
> >>>> erase the cached mapping? Since it is on the broker-side I assume that
> >>>> broker has to do it. Are you suggesting to use a new request for it?
> >>>>
> >>>> 5. "Halfway switch": following 3) above, if your proposal is basically
> >> to
> >>>> let "first join-request wins", and the strategy will stay as is until
> >> all
> >>>> members are gone, then this will also not happen since whoever used
> >>>> different strategy as the first guy who sends join-group request will
> >> be
> >>>> rejected right?
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Tue, Aug 21, 2018 at 9:28 AM, John Roesler <j...@confluent.io>
> >> wrote:
> >>>>
> >>>>> This sounds good to me!
> >>>>>
> >>>>> Thanks for the time you've spent on it,
> >>>>> -John
> >>>>>
> >>>>> On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <bche...@outlook.com>
> >>>> wrote:
> >>>>>
> >>>>>> Thanks Matthias for the input. Sorry I was busy recently and
> >> haven't
> >>>> got
> >>>>>> time to update this thread. To summarize what we come up so far,
> >> here
> >>>> is
> >>>>> a
> >>>>>> draft updated plan:
> >>>>>>
> >>>>>>
> >>>>>> Introduce a new config called `member.name` which is supposed to
> >> be
> >>>>>> provided uniquely by the consumer client. The broker will maintain
> >> a
> >>>>> cache
> >>>>>> with [key:member.name, value:member.id]. A join group request with
> >>>>>> member.name set will be treated as `static-membership` strategy,
> >> and
> >>>>> will
> >>>>>> reject any join group request without member.name. So this
> >>>> coordination
> >>>>>> change will be differentiated from the `dynamic-membership`
> >> protocol
> >>> we
> >>>>>> currently have.
> >>>>>>
> >>>>>>
> >>>>>> When handling static join group request:
> >>>>>>
> >>>>>>  1.   The broker will check the membership to see whether this is
> >> a
> >>>> new
> >>>>>> member. If new, broker allocate a unique member id, cache the
> >> mapping
> >>>> and
> >>>>>> move to rebalance stage.
> >>>>>>  2.   Following 1, if this is an existing member, broker will not
> >>>> change
> >>>>>> group state, and return its cached member.id and current
> >> assignment.
> >>>>>> (unless this is leader, we shall trigger rebalance)
> >>>>>>  3.   Although Guozhang has mentioned we could rejoin with pair
> >>> member
> >>>>>> name and id, I think for join group request it is ok to leave
> >> member
> >>> id
> >>>>>> blank as member name is the unique identifier. In commit offset
> >>> request
> >>>>> we
> >>>>>> *must* have both.
> >>>>>>
> >>>>>>
> >>>>>> When handling commit offset request, if enabled with static
> >>> membership,
> >>>>>> each time the commit request must have both member.name and
> >>> member.id
> >>>> to
> >>>>>> be identified as a `certificated member`. If not, this means there
> >>> are
> >>>>>> duplicate consumer members with same member name and the request
> >> will
> >>>> be
> >>>>>> rejected to guarantee consumption uniqueness.
> >>>>>>
> >>>>>>
> >>>>>> When rolling restart/shutting down gracefully, the client will
> >> send a
> >>>>>> leave group request (static membership mode). In static membership,
> >>> we
> >>>>> will
> >>>>>> also define `change-group-timeout` to hold on rebalance provided by
> >>>>> leader.
> >>>>>> So we will wait for all the members to rejoin the group and do
> >>> exactly
> >>>>> one
> >>>>>> rebalance since all members are expected to rejoin within timeout.
> >> If
> >>>>>> consumer crashes, the join group request from the restarted
> >> consumer
> >>>> will
> >>>>>> be recognized as an existing member and be handled as above
> >> condition
> >>>> 1;
> >>>>>> However, if the consumer takes longer than session timeout to
> >> return,
> >>>> we
> >>>>>> shall still trigger rebalance but it could still try to catch
> >>>>>> `change-group-timeout`. If it failed to catch second timeout, its
> >>>> cached
> >>>>>> state on broker will be garbage collected and trigger a new
> >> rebalance
> >>>>> when
> >>>>>> it finally joins.
> >>>>>>
> >>>>>>
> >>>>>> And consider the switch between dynamic to static membership.
> >>>>>>
> >>>>>>  1.  Dynamic to static: the first joiner shall revise the
> >> membership
> >>>> to
> >>>>>> static and wait for all the current members to restart, since their
> >>>>>> membership is still dynamic. Here our assumption is that the
> >> restart
> >>>>>> process shouldn't take a long time, as long restart is breaking the
> >>>>>> `rebalance timeout` in whatever membership protocol we are using.
> >>>> Before
> >>>>>> restart, all dynamic member join requests will be rejected.
> >>>>>>  2.  Static to dynamic: this is more like a downgrade which should
> >>> be
> >>>>>> smooth: just erase the cached mapping, and wait for session timeout
> >>> to
> >>>>>> trigger rebalance should be sufficient. (Fallback to current
> >>> behavior)
> >>>>>>  3.  Halfway switch: a corner case is like some clients keep
> >> dynamic
> >>>>>> membership while some keep static membership. This will cause the
> >>> group
> >>>>>> rebalance forever without progress because dynamic/static states
> >> are
> >>>>>> bouncing each other. This could guarantee that we will not make the
> >>>>>> consumer group work in a wrong state by having half static and half
> >>>>> dynamic.
> >>>>>>
> >>>>>> To guarantee correctness, we will also push the member name/id pair
> >>> to
> >>>>>> _consumed_offsets topic (as Matthias pointed out) and upgrade the
> >> API
> >>>>>> version, these details will be further discussed back in the KIP.
> >>>>>>
> >>>>>>
> >>>>>> Are there any concern for this high level proposal? Just want to
> >>>>> reiterate
> >>>>>> on the core idea of the KIP: "If the broker recognize this consumer
> >>> as
> >>>> an
> >>>>>> existing member, it shouldn't trigger rebalance".
> >>>>>>
> >>>>>> Thanks a lot for everyone's input! I feel this proposal is much
> >> more
> >>>>>> robust than previous one!
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Boyang
> >>>>>>
> >>>>>> ________________________________
> >>>>>> From: Matthias J. Sax <matth...@confluent.io>
> >>>>>> Sent: Friday, August 10, 2018 2:24 AM
> >>>>>> To: dev@kafka.apache.org
> >>>>>> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> >>> by
> >>>>>> specifying member id
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> thanks for the detailed discussion. I learned a lot about internals
> >>>> again
> >>>>>> :)
> >>>>>>
> >>>>>> I like the idea or a user config `member.name` and to keep `
> >>> member.id`
> >>>>>> internal. Also agree with Guozhang, that reusing `client.id` might
> >>> not
> >>>>>> be a good idea.
> >>>>>>
> >>>>>> To clarify the algorithm, each time we generate a new `member.id`,
> >>> we
> >>>>>> also need to update the "group membership" information (ie, mapping
> >>>>>> [member.id, Assignment]), right? Ie, the new `member.id` replaces
> >>> the
> >>>>>> old entry in the cache.
> >>>>>>
> >>>>>> I also think, we need to preserve the `member.name -> member.id`
> >>>> mapping
> >>>>>> in the `__consumer_offset` topic. The KIP should mention this IMHO.
> >>>>>>
> >>>>>> For changing the default value of config `leave.group.on.close`. I
> >>>> agree
> >>>>>> with John, that we should not change the default config, because it
> >>>>>> would impact all consumer groups with dynamic assignment. However,
> >> I
> >>>>>> think we can document, that if static assignment is used (ie,
> >>>>>> `member.name` is configured) we never send a LeaveGroupRequest
> >>>>>> regardless of the config. Note, that the config is internal, so not
> >>>> sure
> >>>>>> how to document this in detail. We should not expose the internal
> >>>> config
> >>>>>> in the docs.
> >>>>>>
> >>>>>> About upgrading: why do we need have two rolling bounces and encode
> >>>>>> "static" vs "dynamic" in the JoinGroupRequest?
> >>>>>>
> >>>>>> If we upgrade an existing consumer group from dynamic to static, I
> >>>> don't
> >>>>>> see any reason why both should not work together and single rolling
> >>>>>> bounce would not be sufficient? If we bounce the first consumer and
> >>>>>> switch from dynamic to static, it sends a `member.name` and the
> >>> broker
> >>>>>> registers the [member.name, member.id] in the cache. Why would
> >> this
> >>>>>> interfere with all other consumer that use dynamic assignment?
> >>>>>>
> >>>>>> Also, Guozhang mentioned that for all other request, we need to
> >> check
> >>>> if
> >>>>>> the mapping [member.name, member.id] contains the send `member.id`
> >>> --
> >>>> I
> >>>>>> don't think this is necessary -- it seems to be sufficient to check
> >>> the
> >>>>>> `member.id` from the [member.id, Assignment] mapping as be do
> >> today
> >>> --
> >>>>>> thus, checking `member.id` does not require any change IMHO.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>> On 8/7/18 7:13 PM, Guozhang Wang wrote:
> >>>>>>> @James
> >>>>>>>
> >>>>>>> What you described is true: the transition from dynamic to static
> >>>>>>> memberships are not thought through yet. But I do not think it is
> >>> an
> >>>>>>> impossible problem: note that we indeed moved the offset commit
> >>> from
> >>>> ZK
> >>>>>> to
> >>>>>>> kafka coordinator in 0.8.2 :) The migration plan is to first to
> >>>>>>> double-commits on both zk and coordinator, and then do a second
> >>> round
> >>>>> to
> >>>>>>> turn the zk off.
> >>>>>>>
> >>>>>>> So just to throw a wild idea here: also following a
> >>>> two-rolling-bounce
> >>>>>>> manner, in the JoinGroupRequest we can set the flag to "static"
> >>> while
> >>>>>> keep
> >>>>>>> the registry-id field empty still, in this case, the coordinator
> >>>> still
> >>>>>>> follows the logic of "dynamic", accepting the request while
> >>> allowing
> >>>>> the
> >>>>>>> protocol to be set to "static"; after the first rolling bounce,
> >> the
> >>>>> group
> >>>>>>> protocol is already "static", then a second rolling bounce is
> >>>> triggered
> >>>>>> and
> >>>>>>> this time we set the registry-id.
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>> On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <
> >> wushuja...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Guozhang, in a previous message, you proposed said this:
> >>>>>>>>
> >>>>>>>>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangg...@gmail.com
> >>>
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> 1. We bump up the JoinGroupRequest with additional fields:
> >>>>>>>>>
> >>>>>>>>> 1.a) a flag indicating "static" or "dynamic" membership
> >>> protocols.
> >>>>>>>>> 1.b) with "static" membership, we also add the pre-defined
> >>> member
> >>>>> id.
> >>>>>>>>> 1.c) with "static" membership, we also add an optional
> >>>>>>>>> "group-change-timeout" value.
> >>>>>>>>>
> >>>>>>>>> 2. On the broker side, we enforce only one of the two protocols
> >>> for
> >>>>> all
> >>>>>>>>> group members: we accept the protocol on the first joined
> >> member
> >>> of
> >>>>> the
> >>>>>>>>> group, and if later joining members indicate a different
> >>> membership
> >>>>>>>>> protocol, we reject it. If the group-change-timeout value was
> >>>>> different
> >>>>>>>> to
> >>>>>>>>> the first joined member, we reject it as well.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> What will happen if we have an already-deployed application that
> >>>> wants
> >>>>>> to
> >>>>>>>> switch to using static membership? Let’s say there are 10
> >>> instances
> >>>> of
> >>>>>> it.
> >>>>>>>> As the instances go through a rolling restart, they will switch
> >>> from
> >>>>>>>> dynamic membership (the default?) to static membership. As each
> >>> one
> >>>>>> leaves
> >>>>>>>> the group and restarts, they will be rejected from the group
> >>>> (because
> >>>>>> the
> >>>>>>>> group is currently using dynamic membership). The group will
> >>> shrink
> >>>>> down
> >>>>>>>> until there is 1 node handling all the traffic. After that one
> >>>>> restarts,
> >>>>>>>> the group will switch over to static membership.
> >>>>>>>>
> >>>>>>>> Is that right? That means that the transition plan from dynamic
> >> to
> >>>>>> static
> >>>>>>>> membership isn’t very smooth.
> >>>>>>>>
> >>>>>>>> I’m not really sure what can be done in this case. This reminds
> >> me
> >>>> of
> >>>>>> the
> >>>>>>>> transition plans that were discussed for moving from
> >>> zookeeper-based
> >>>>>>>> consumers to kafka-coordinator-based consumers. That was also
> >>> hard,
> >>>>> and
> >>>>>>>> ultimately we decided not to build that.
> >>>>>>>>
> >>>>>>>> -James
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>

Reply via email to