Hey Mayuresh,

thanks for the thoughtful questions! Let me try to answer your questions one by 
one.


For having a consumer config at runtime, I think it's not necessary to address 
in this KIP because most companies run sidecar jobs through daemon software 
like puppet. It should be easy to change the config through script or UI 
without actual code change. We still want to leave flexibility for user to 
define member name as they like.


I just updated the kip about having both "registration timeout" and "session 
timeout". The benefit of having two configs instead of one is to reduce the 
mental burden for operation, for example user just needs to unset "member name" 
to cast back to dynamic membership without worrying about tuning the "session 
timeout" back to a smaller value.


For backup topic, I think it's a low-level detail which could be addressed in 
the implementation. I feel no preference of adding a new topic vs reuse 
consumer offsets topic. I will do more analysis and make a trade-off 
comparison. Nice catch!


I hope the explanations make sense to you. I will keep polishing on the edge 
cases and details.


Best,

Boyang

________________________________
From: Mayuresh Gharat <gharatmayures...@gmail.com>
Sent: Saturday, November 10, 2018 10:25 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Boyang,

Thanks for the KIP and sorry for being late to the party. This KIP is
really useful for us at Linkedin.

I had a few questions :

The idea of having static member name seems nice, but instead of a config,
would it be possible for it to be passed in to the consumer at runtime?
This is because an app might want to decide the config value at runtime
using its host information for example, to generate the unique member name.

Also the KIP talks about using the "REGISTRATION_TIMEOUT_MS". I was
wondering if we can reuse the session timeout here. This might help us to
have one less config on the consumer.

The KIP also talks about adding another internal topic "static_member_map".
Would the semantics (GroupCoordinator broker, topic configs) be the same as
__consumer_offsets topic?

Thanks,

Mayuresh


On Wed, Nov 7, 2018 at 12:17 AM Boyang Chen <bche...@outlook.com> wrote:

> I took a quick pass of the proposal. First I would say it's a very
> brilliant initiative from Konstantine and Confluent folks. To draft up a
> proposal like this needs deep understanding of the rebalance protocol! I
> summarized some thoughts here.
>
>
> Overall the motivations of the two proposals align on that:
>
>   1.  Both believe the invariant resource (belonging to the same process)
> should be preserved across rebalance.
>   2.  Transit failures (K8 thread death) shouldn't trigger resource
> redistribution. I don't use rebalance here since part one of the
> cooperative proposal could potentially introduce more rebalances but only
> on must-move resources.
>   3.  Scale up/down and rolling bounce are causing unnecessary resource
> shuffling that need to be mitigated.
>
>
> On motivation level, I think both approach could solve/mitigate the above
> issues. They are just different in design philosophy, or I would say the
> perspective difference between framework user and algorithm designer.
>
>
> Two proposals have different focuses. KIP-345 is trying to place more
> fine-grained control on the broker side to reduce the unnecessary
> rebalances, while keeping the client logic intact. This is pretty intuitive
> cause-effect for normal developers who are not very familiar with rebalance
> protocol. As a developer working with Kafka Streams daily, I'd be happy to
> see a simplified rebalance protocol and just focus on maintaining the
> stream/consumer jobs. Too many rebalances raised my concern on the job
> health. To be concise, static membership has the advantage of reducing
> mental burden.
>
>
> Cooperative proposal takes thoughtful approach on client side. We want to
> have fine-grained control on the join/exit group behaviors and make the
> current dynamic membership better to address above issues. I do feel our
> idea crossed on the delayed rebalance when we scale up/down, which could
> potentially reduce the state shuffling and decouple the behavior from
> session timeout which is already overloaded.  In this sense, I believe both
> approaches would serve well in making "reasonable rebalance" happen at the
> "right timing".
>
>
> However, based on my understanding, either 345 or cooperative rebalancing
> is not solving the problem Mike has proposed: could we do a better job at
> scaling up/down in ideal timing? My initial response was to introduce an
> admin API which now I feel is sub-optimal, in that the goal of smooth
> transition is to make sure the newly up hosts are actually "ready". For
> example:
>
>
> We have 4 instance reading from 8 topic partitions (= 8 tasks). At some
> time we would like to scale up to 8 hosts, with the current improvements we
> could reduce 4 potential rebalances to a single one. But the new hosts are
> yet unknown to be "ready" if they need to reconstruct the local state. To
> be actually ready, we need 4 standby tasks running on those empty hosts and
> leader needs to wait for the signal of "replay/reconstruct complete" to
> actually involve them into the main consumer group. Otherwise, rebalance
> just kills our performance since we need to wait indefinite long for task
> migration.
>
>
> The scale down is also tricky such that we are not able to define a "true"
> leave of a member. Rebalance immediately after "true" leaves are most
> optimal comparing with human intervention. Does this make sense?
>
>
> My intuition is that cooperative approach which was implemented on the
> client side could better handle scaling cases than KIP 345, since it
> involves a lot of algorithmic changes to define "replaying" stage, which I
> feel would over-complicate broker logic if implemented on coordinator. If
> we let 345 focus on reducing unnecessary rebalance, and let cooperative
> approach focus on judging best timing of scale up/down, the two efforts
> could be aligned. In long term, I feel the more complex improvement of
> consumer protocol should happen on client side instead of server side which
> is easier to test and has less global impact for the entire Kafka
> production cluster.
>
>
> Thanks again to Konstantine, Matthias and other folks in coming up with
> this great client proposal. This is great complementation to KIP 345. In a
> high level, we are not having any collision on the path and both proposals
> are making sense here. Just need better sync to avoid duplicate effort :)
>
>
> Best,
>
> Boyang
>
>
> ________________________________
> From: Boyang Chen <bche...@outlook.com>
> Sent: Wednesday, November 7, 2018 1:57 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Thanks Matthias for bringing this awesome proposal up! I shall take a
> deeper look and make a comparison between the two proposals.
>
>
> Meanwhile for the scale down specifically for stateful streaming, we could
> actually introduce a new status called "learner" where the newly up hosts
> could try to catch up with the assigned task progress first before
> triggering the rebalance, from which we don't see a sudden dip on the
> progress. However, it is built on top of the success of KIP-345.
>
>
> ________________________________
> From: Matthias J. Sax <matth...@confluent.io>
> Sent: Wednesday, November 7, 2018 7:02 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hey,
>
> there was quite a pause on this KIP discussion and in the mean time, a
> new design for incremental cooporative rebalance was suggested:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing%3A+Support+and+Policies
Incremental Cooperative Rebalancing: Support and Policies 
...<https://cwiki.apache.org/confluence/display/KAFKA/Incremental+Cooperative+Rebalancing%3A+Support+and+Policies>
cwiki.apache.org
Rebalancing between distributed application processes in Apache Kafka was 
enhanced considerably when it was decoupled as logic from Kafka brokers and was 
moved as responsibility to the clients and specifically to Kafka Consumer. This 
pattern has been working robustly for quite a while now and has ...



>
>
> We should make sure that the proposal and this KIP align to each other.
> Thoughts?
>
>
> -Matthias
>
> On 11/5/18 7:31 PM, Boyang Chen wrote:
> > Hey Mike,
> >
> >
> > thanks for the feedback, the two question are very thoughtful!
> >
> >
> >> 1) I am a little confused about the distinction for the leader. If the
> consumer node that was assigned leader does a bounce (goes down and quickly
> comes up) to update application code, will a rebalance be triggered? I > do
> not think a bounce of the leader should trigger a rebalance.
> >
> > For Q1 my intention was to minimize the change within one KIP, since the
> leader rejoining case could be addressed separately.
> >
> >
> >> 2) The timeout for shrink up makes a lot of sense and allows to
> gracefully increase the number of nodes in the cluster. I think we need to
> support graceful shrink down as well. If I set the registration timeout to
> 5 minutes > to handle rolling restarts or intermittent failures without
> shuffling state, I don't want to wait 5 minutes in order for the group to
> rebalance if I am intentionally removing a node from the cluster. I am not
> sure the best way to > do this. One idea I had was adding the ability for a
> CLI or Admin API to force a rebalance of the group. This would allow for an
> admin to trigger the rebalance manually without waiting the entire
> registration timeout on > shrink down. What do you think?
> >
> > For 2) my understanding is that for scaling down case it is better to be
> addressed by CLI tool than code logic, since only by human evaluation we
> could decide whether it is a "right timing" -- the time when all the
> scaling down consumers are offline -- to kick in rebalance. Unless we
> introduce another term on coordinator which indicates the target consumer
> group size, broker will find it hard to decide when to start rebalance. So
> far I prefer to hold the implementation for that, but agree we could
> discuss whether we want to introduce admin API in this KIP or a separate
> one.
> >
> >
> > Thanks again for the proposed ideas!
> >
> >
> > Boyang
> >
> > ________________________________
> > From: Mike Freyberger <mike.freyber...@xandr.com>
> > Sent: Monday, November 5, 2018 6:13 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
> >
> > Boyang,
> >
> > Thanks for updating the KIP. It's shaping up well. Two things:
> >
> > 1) I am a little confused about the distinction for the leader. If the
> consumer node that was assigned leader does a bounce (goes down and quickly
> comes up) to update application code, will a rebalance be triggered? I do
> not think a bounce of the leader should trigger a rebalance.
> >
> > 2) The timeout for shrink up makes a lot of sense and allows to
> gracefully increase the number of nodes in the cluster. I think we need to
> support graceful shrink down as well. If I set the registration timeout to
> 5 minutes to handle rolling restarts or intermittent failures without
> shuffling state, I don't want to wait 5 minutes in order for the group to
> rebalance if I am intentionally removing a node from the cluster. I am not
> sure the best way to do this. One idea I had was adding the ability for a
> CLI or Admin API to force a rebalance of the group. This would allow for an
> admin to trigger the rebalance manually without waiting the entire
> registration timeout on shrink down. What do you think?
> >
> > Mike
> >
> > On 10/30/18, 1:55 AM, "Boyang Chen" <bche...@outlook.com> wrote:
> >
> >     Btw, I updated KIP 345 based on my understanding. Feel free to take
> another round of look:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> > KIP-345: Introduce static membership protocol to reduce ...<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
> > cwiki.apache.org
> > For stateful applications, one of the biggest performance bottleneck is
> the state shuffling. In Kafka consumer, there is a concept called
> "rebalance" which means that for given M partitions and N consumers in one
> consumer group, Kafka will try to balance the load between consumers and
> ideally have ...
> >
> >
> >
> >
> >     KIP-345: Introduce static membership protocol to reduce ...<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
> >
> >     cwiki.apache.org
> >     For stateful applications, one of the biggest performance bottleneck
> is the state shuffling. In Kafka consumer, there is a concept called
> "rebalance" which means that for given M partitions and N consumers in one
> consumer group, Kafka will try to balance the load between consumers and
> ideally have ...
> >
> >
> >
> >
> >
> >     ________________________________
> >     From: Boyang Chen <bche...@outlook.com>
> >     Sent: Monday, October 29, 2018 12:34 PM
> >     To: dev@kafka.apache.org
> >     Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> by specifying member id
> >
> >     Thanks everyone for the input on this thread! (Sorry it's been a
> while) I feel that we are very close to the final solution.
> >
> >
> >     Hey Jason and Mike, I have two quick questions on the new features
> here:
> >
> >       1.  so our proposal is that until we add a new static member into
> the group (scale up), we will not trigger rebalance until the "registration
> timeout"( the member has been offline for too long)? How about leader's
> rejoin request, I think we should still trigger rebalance when that
> happens, since the consumer group may have new topics to consume?
> >       2.  I'm not very clear on the scale up scenario in static
> membership here. Should we fallback to dynamic membership while
> adding/removing hosts (by setting member.name = null), or we still want
> to add instances with `member.name` so that we eventually expand/shrink
> the static membership? I personally feel the easier solution is to spin up
> new members and wait until either the same "registration timeout" or a
> "scale up timeout" before starting the rebalance. What do you think?
> >
> >     Meanwhile I will go ahead to make changes to the KIP with our newly
> discussed items and details. Really excited to see the design has become
> more solid.
> >
> >     Best,
> >     Boyang
> >
> >     ________________________________
> >     From: Jason Gustafson <ja...@confluent.io>
> >     Sent: Saturday, August 25, 2018 6:04 AM
> >     To: dev
> >     Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> by specifying member id
> >
> >     Hey Mike,
> >
> >     Yeah, that's a good point. A long "registration timeout" may not be
> a great
> >     idea. Perhaps in practice you'd set it long enough to be able to
> detect a
> >     failure and provision a new instance. Maybe on the order of 10
> minutes is
> >     more reasonable.
> >
> >     In any case, it's probably a good idea to have an administrative way
> to
> >     force deregistration. One option is to extend the DeleteGroups API
> with a
> >     list of members names.
> >
> >     -Jason
> >
> >
> >
> >     On Fri, Aug 24, 2018 at 2:21 PM, Mike Freyberger <
> mfreyber...@appnexus.com>
> >     wrote:
> >
> >     > Jason,
> >     >
> >     > Regarding step 4 in your proposal which suggests beginning a long
> timer
> >     > (30 minutes) when a static member leaves the group, would there
> also be the
> >     > ability for an admin to force a static membership expiration?
> >     >
> >     > I’m thinking that during particular types of outages or upgrades
> users
> >     > would want forcefully remove a static member from the group.
> >     >
> >     > So the user would shut the consumer down normally, which wouldn’t
> trigger
> >     > a rebalance. Then the user could use an admin CLI tool to force
> remove that
> >     > consumer from the group, so the TopicPartitions that were
> previously owned
> >     > by that consumer can be released.
> >     >
> >     > At a high level, we need consumer groups to gracefully handle
> intermittent
> >     > failures and permanent failures. Currently, the consumer group
> protocol
> >     > handles permanent failures well, but does not handle intermittent
> failures
> >     > well (it creates unnecessary rebalances). I want to make sure the
> overall
> >     > solution here handles both intermittent failures and permanent
> failures,
> >     > rather than sacrificing support for permanent failures in order to
> provide
> >     > support for intermittent failures.
> >     >
> >     > Mike
> >     >
> >     > Sent from my iPhone
> >     >
> >     > > On Aug 24, 2018, at 3:03 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
> >     > >
> >     > > Hey Guozhang,
> >     > >
> >     > > Responses below:
> >     > >
> >     > > Originally I was trying to kill more birds with one stone with
> KIP-345,
> >     > >> e.g. to fix the multi-rebalance issue on starting up / shutting
> down a
> >     > >> multi-instance client (mentioned as case 1)/2) in my early
> email), and
> >     > >> hence proposing to have a pure static-membership protocol. But
> thinking
> >     > >> twice about it I now feel it may be too ambitious and worth
> fixing in
> >     > >> another KIP.
> >     > >
> >     > >
> >     > > I was considering an extension to support pre-initialization of
> the
> >     > static
> >     > > members of the group, but I agree we should probably leave this
> problem
> >     > for
> >     > > future work.
> >     > >
> >     > > 1. How this longish static member expiration timeout defined? Is
> it via a
> >     > >> broker, hence global config, or via a client config which can be
> >     > >> communicated to broker via JoinGroupRequest?
> >     > >
> >     > >
> >     > > I am not too sure. I tend to lean toward server-side configs
> because they
> >     > > are easier to evolve. If we have to add something to the
> protocol, then
> >     > > we'll be stuck with it forever.
> >     > >
> >     > > 2. Assuming that for static members, LEAVE_GROUP request will not
> >     > trigger a
> >     > >> rebalance immediately either, similar to session timeout, but
> only the
> >     > >> longer member expiration timeout, can we remove the internal "
> >     > >> internal.leave.group.on.close" config, which is a quick
> walk-around
> >     > then?
> >     > >
> >     > >
> >     > > Yeah, I hope we can ultimately get rid of it, but we may need it
> for
> >     > > compatibility with older brokers. A related question is what
> should be
> >     > the
> >     > > behavior of the consumer if `member.name` is provided but the
> broker
> >     > does
> >     > > not support it? We could either fail or silently downgrade to
> dynamic
> >     > > membership.
> >     > >
> >     > > -Jason
> >     > >
> >     > >
> >     > >> On Fri, Aug 24, 2018 at 11:44 AM, Guozhang Wang <
> wangg...@gmail.com>
> >     > wrote:
> >     > >>
> >     > >> Hey Jason,
> >     > >>
> >     > >> I like your idea to simplify the upgrade protocol to allow
> co-exist of
> >     > >> static and dynamic members. Admittedly it may make the
> coordinator-side
> >     > >> logic a bit more complex, but I think it worth doing it.
> >     > >>
> >     > >> Originally I was trying to kill more birds with one stone with
> KIP-345,
> >     > >> e.g. to fix the multi-rebalance issue on starting up / shutting
> down a
> >     > >> multi-instance client (mentioned as case 1)/2) in my early
> email), and
> >     > >> hence proposing to have a pure static-membership protocol. But
> thinking
> >     > >> twice about it I now feel it may be too ambitious and worth
> fixing in
> >     > >> another KIP. With that, I think what you've proposed here is a
> good way
> >     > to
> >     > >> go for KIP-345 itself.
> >     > >>
> >     > >> Note there are a few details in your proposal we'd still need
> to figure
> >     > >> out:
> >     > >>
> >     > >> 1. How this longish static member expiration timeout defined?
> Is it via
> >     > a
> >     > >> broker, hence global config, or via a client config which can be
> >     > >> communicated to broker via JoinGroupRequest?
> >     > >>
> >     > >> 2. Assuming that for static members, LEAVE_GROUP request will
> not
> >     > trigger a
> >     > >> rebalance immediately either, similar to session timeout, but
> only the
> >     > >> longer member expiration timeout, can we remove the internal "
> >     > >> internal.leave.group.on.close" config, which is a quick
> walk-around
> >     > then?
> >     > >>
> >     > >>
> >     > >>
> >     > >> Guozhang
> >     > >>
> >     > >>
> >     > >> On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson <
> ja...@confluent.io>
> >     > >> wrote:
> >     > >>
> >     > >>> Hey All,
> >     > >>>
> >     > >>> Nice to see some solid progress on this. It sounds like one of
> the
> >     > >>> complications is allowing static and dynamic registration to
> coexist.
> >     > I'm
> >     > >>> wondering if we can do something like the following:
> >     > >>>
> >     > >>> 1. Statically registered members (those joining the group with
> a
> >     > >> non-null `
> >     > >>> member.name`) maintain a session with the coordinator just
> like
> >     > dynamic
> >     > >>> members.
> >     > >>> 2. If a session is active for a static member when a rebalance
> begins,
> >     > >> then
> >     > >>> basically we'll keep the current behavior. The rebalance will
> await the
> >     > >>> static member joining the group.
> >     > >>> 3. If a static member does not have an active session, then the
> >     > >> coordinator
> >     > >>> will not wait for it to join, but will still include it in the
> >     > rebalance.
> >     > >>> The coordinator will forward the cached subscription
> information to the
> >     > >>> leader and will cache the assignment after the rebalance
> completes.
> >     > (Note
> >     > >>> that we still have the generationId to fence offset commits
> from a
> >     > static
> >     > >>> zombie if the assignment changes.)
> >     > >>> 4. When a static member leaves the group or has its session
> expire, no
> >     > >>> rebalance is triggered. Instead, we can begin a timer to
> expire the
> >     > >> static
> >     > >>> registration. This would be a longish timeout (like 30 minutes
> say).
> >     > >>>
> >     > >>> So basically static members participate in all rebalances
> regardless
> >     > >>> whether they have an active session. In a given rebalance,
> some of the
> >     > >>> members may be static and some dynamic. The group leader can
> >     > >> differentiate
> >     > >>> the two based on the presence of the `member.name` (we have
> to add
> >     > this
> >     > >> to
> >     > >>> the JoinGroupResponse). Generally speaking, we would choose
> leaders
> >     > >>> preferentially from the active members that support the latest
> >     > JoinGroup
> >     > >>> protocol and are using static membership. If we have to choose
> a leader
> >     > >>> with an old version, however, it would see all members in the
> group
> >     > >> (static
> >     > >>> or dynamic) as dynamic members and perform the assignment as
> usual.
> >     > >>>
> >     > >>> Would that work?
> >     > >>>
> >     > >>> -Jason
> >     > >>>
> >     > >>>
> >     > >>> On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang <
> wangg...@gmail.com>
> >     > >> wrote:
> >     > >>>
> >     > >>>> Hello Boyang,
> >     > >>>>
> >     > >>>> Thanks for the updated proposal, a few questions:
> >     > >>>>
> >     > >>>> 1. Where will "change-group-timeout" be communicated to the
> broker?
> >     > >> Will
> >     > >>>> that be a new field in the JoinGroupRequest, or are we going
> to
> >     > >>> piggy-back
> >     > >>>> on the existing session-timeout field (assuming that the
> original
> >     > value
> >     > >>>> will not be used anywhere in the static membership any more)?
> >     > >>>>
> >     > >>>> 2. "However, if the consumer takes longer than session
> timeout to
> >     > >> return,
> >     > >>>> we shall still trigger rebalance but it could still try to
> catch
> >     > >>>> `change-group-timeout`.": what does this mean? I thought your
> proposal
> >     > >> is
> >     > >>>> that for static memberships, the broker will NOT trigger
> rebalance
> >     > even
> >     > >>>> after session-timeout has been detected, but only that after
> >     > >>>> change-group-timeout
> >     > >>>> which is supposed to be longer than session-timeout to be
> defined?
> >     > >>>>
> >     > >>>> 3. "A join group request with member.name set will be
> treated as
> >     > >>>> `static-membership` strategy", in this case, how would the
> switch from
> >     > >>>> dynamic to static happen, since whoever changed the
> member.name to
> >     > >>>> not-null
> >     > >>>> will be rejected, right?
> >     > >>>>
> >     > >>>> 4. "just erase the cached mapping, and wait for session
> timeout to
> >     > >>> trigger
> >     > >>>> rebalance should be sufficient." this is also a bit unclear
> to me: who
> >     > >>> will
> >     > >>>> erase the cached mapping? Since it is on the broker-side I
> assume that
> >     > >>>> broker has to do it. Are you suggesting to use a new request
> for it?
> >     > >>>>
> >     > >>>> 5. "Halfway switch": following 3) above, if your proposal is
> basically
> >     > >> to
> >     > >>>> let "first join-request wins", and the strategy will stay as
> is until
> >     > >> all
> >     > >>>> members are gone, then this will also not happen since
> whoever used
> >     > >>>> different strategy as the first guy who sends join-group
> request will
> >     > >> be
> >     > >>>> rejected right?
> >     > >>>>
> >     > >>>>
> >     > >>>> Guozhang
> >     > >>>>
> >     > >>>>
> >     > >>>> On Tue, Aug 21, 2018 at 9:28 AM, John Roesler <
> j...@confluent.io>
> >     > >> wrote:
> >     > >>>>
> >     > >>>>> This sounds good to me!
> >     > >>>>>
> >     > >>>>> Thanks for the time you've spent on it,
> >     > >>>>> -John
> >     > >>>>>
> >     > >>>>> On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <
> bche...@outlook.com>
> >     > >>>> wrote:
> >     > >>>>>
> >     > >>>>>> Thanks Matthias for the input. Sorry I was busy recently and
> >     > >> haven't
> >     > >>>> got
> >     > >>>>>> time to update this thread. To summarize what we come up so
> far,
> >     > >> here
> >     > >>>> is
> >     > >>>>> a
> >     > >>>>>> draft updated plan:
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> Introduce a new config called `member.name` which is
> supposed to
> >     > >> be
> >     > >>>>>> provided uniquely by the consumer client. The broker will
> maintain
> >     > >> a
> >     > >>>>> cache
> >     > >>>>>> with [key:member.name, value:member.id]. A join group
> request with
> >     > >>>>>> member.name set will be treated as `static-membership`
> strategy,
> >     > >> and
> >     > >>>>> will
> >     > >>>>>> reject any join group request without member.name. So this
> >     > >>>> coordination
> >     > >>>>>> change will be differentiated from the `dynamic-membership`
> >     > >> protocol
> >     > >>> we
> >     > >>>>>> currently have.
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> When handling static join group request:
> >     > >>>>>>
> >     > >>>>>>  1.   The broker will check the membership to see whether
> this is
> >     > >> a
> >     > >>>> new
> >     > >>>>>> member. If new, broker allocate a unique member id, cache
> the
> >     > >> mapping
> >     > >>>> and
> >     > >>>>>> move to rebalance stage.
> >     > >>>>>>  2.   Following 1, if this is an existing member, broker
> will not
> >     > >>>> change
> >     > >>>>>> group state, and return its cached member.id and current
> >     > >> assignment.
> >     > >>>>>> (unless this is leader, we shall trigger rebalance)
> >     > >>>>>>  3.   Although Guozhang has mentioned we could rejoin with
> pair
> >     > >>> member
> >     > >>>>>> name and id, I think for join group request it is ok to
> leave
> >     > >> member
> >     > >>> id
> >     > >>>>>> blank as member name is the unique identifier. In commit
> offset
> >     > >>> request
> >     > >>>>> we
> >     > >>>>>> *must* have both.
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> When handling commit offset request, if enabled with static
> >     > >>> membership,
> >     > >>>>>> each time the commit request must have both member.name and
> >     > >>> member.id
> >     > >>>> to
> >     > >>>>>> be identified as a `certificated member`. If not, this
> means there
> >     > >>> are
> >     > >>>>>> duplicate consumer members with same member name and the
> request
> >     > >> will
> >     > >>>> be
> >     > >>>>>> rejected to guarantee consumption uniqueness.
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> When rolling restart/shutting down gracefully, the client
> will
> >     > >> send a
> >     > >>>>>> leave group request (static membership mode). In static
> membership,
> >     > >>> we
> >     > >>>>> will
> >     > >>>>>> also define `change-group-timeout` to hold on rebalance
> provided by
> >     > >>>>> leader.
> >     > >>>>>> So we will wait for all the members to rejoin the group and
> do
> >     > >>> exactly
> >     > >>>>> one
> >     > >>>>>> rebalance since all members are expected to rejoin within
> timeout.
> >     > >> If
> >     > >>>>>> consumer crashes, the join group request from the restarted
> >     > >> consumer
> >     > >>>> will
> >     > >>>>>> be recognized as an existing member and be handled as above
> >     > >> condition
> >     > >>>> 1;
> >     > >>>>>> However, if the consumer takes longer than session timeout
> to
> >     > >> return,
> >     > >>>> we
> >     > >>>>>> shall still trigger rebalance but it could still try to
> catch
> >     > >>>>>> `change-group-timeout`. If it failed to catch second
> timeout, its
> >     > >>>> cached
> >     > >>>>>> state on broker will be garbage collected and trigger a new
> >     > >> rebalance
> >     > >>>>> when
> >     > >>>>>> it finally joins.
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> And consider the switch between dynamic to static
> membership.
> >     > >>>>>>
> >     > >>>>>>  1.  Dynamic to static: the first joiner shall revise the
> >     > >> membership
> >     > >>>> to
> >     > >>>>>> static and wait for all the current members to restart,
> since their
> >     > >>>>>> membership is still dynamic. Here our assumption is that the
> >     > >> restart
> >     > >>>>>> process shouldn't take a long time, as long restart is
> breaking the
> >     > >>>>>> `rebalance timeout` in whatever membership protocol we are
> using.
> >     > >>>> Before
> >     > >>>>>> restart, all dynamic member join requests will be rejected.
> >     > >>>>>>  2.  Static to dynamic: this is more like a downgrade which
> should
> >     > >>> be
> >     > >>>>>> smooth: just erase the cached mapping, and wait for session
> timeout
> >     > >>> to
> >     > >>>>>> trigger rebalance should be sufficient. (Fallback to current
> >     > >>> behavior)
> >     > >>>>>>  3.  Halfway switch: a corner case is like some clients keep
> >     > >> dynamic
> >     > >>>>>> membership while some keep static membership. This will
> cause the
> >     > >>> group
> >     > >>>>>> rebalance forever without progress because dynamic/static
> states
> >     > >> are
> >     > >>>>>> bouncing each other. This could guarantee that we will not
> make the
> >     > >>>>>> consumer group work in a wrong state by having half static
> and half
> >     > >>>>> dynamic.
> >     > >>>>>>
> >     > >>>>>> To guarantee correctness, we will also push the member
> name/id pair
> >     > >>> to
> >     > >>>>>> _consumed_offsets topic (as Matthias pointed out) and
> upgrade the
> >     > >> API
> >     > >>>>>> version, these details will be further discussed back in
> the KIP.
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> Are there any concern for this high level proposal? Just
> want to
> >     > >>>>> reiterate
> >     > >>>>>> on the core idea of the KIP: "If the broker recognize this
> consumer
> >     > >>> as
> >     > >>>> an
> >     > >>>>>> existing member, it shouldn't trigger rebalance".
> >     > >>>>>>
> >     > >>>>>> Thanks a lot for everyone's input! I feel this proposal is
> much
> >     > >> more
> >     > >>>>>> robust than previous one!
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> Best,
> >     > >>>>>>
> >     > >>>>>> Boyang
> >     > >>>>>>
> >     > >>>>>> ________________________________
> >     > >>>>>> From: Matthias J. Sax <matth...@confluent.io>
> >     > >>>>>> Sent: Friday, August 10, 2018 2:24 AM
> >     > >>>>>> To: dev@kafka.apache.org
> >     > >>>>>> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> rebalances
> >     > >>> by
> >     > >>>>>> specifying member id
> >     > >>>>>>
> >     > >>>>>> Hi,
> >     > >>>>>>
> >     > >>>>>> thanks for the detailed discussion. I learned a lot about
> internals
> >     > >>>> again
> >     > >>>>>> :)
> >     > >>>>>>
> >     > >>>>>> I like the idea or a user config `member.name` and to keep
> `
> >     > >>> member.id`
> >     > >>>>>> internal. Also agree with Guozhang, that reusing `client.id`
> might
> >     > >>> not
> >     > >>>>>> be a good idea.
> >     > >>>>>>
> >     > >>>>>> To clarify the algorithm, each time we generate a new `
> member.id`,
> >     > >>> we
> >     > >>>>>> also need to update the "group membership" information (ie,
> mapping
> >     > >>>>>> [member.id, Assignment]), right? Ie, the new `member.id`
> replaces
> >     > >>> the
> >     > >>>>>> old entry in the cache.
> >     > >>>>>>
> >     > >>>>>> I also think, we need to preserve the `member.name ->
> member.id`
> >     > >>>> mapping
> >     > >>>>>> in the `__consumer_offset` topic. The KIP should mention
> this IMHO.
> >     > >>>>>>
> >     > >>>>>> For changing the default value of config
> `leave.group.on.close`. I
> >     > >>>> agree
> >     > >>>>>> with John, that we should not change the default config,
> because it
> >     > >>>>>> would impact all consumer groups with dynamic assignment.
> However,
> >     > >> I
> >     > >>>>>> think we can document, that if static assignment is used
> (ie,
> >     > >>>>>> `member.name` is configured) we never send a
> LeaveGroupRequest
> >     > >>>>>> regardless of the config. Note, that the config is
> internal, so not
> >     > >>>> sure
> >     > >>>>>> how to document this in detail. We should not expose the
> internal
> >     > >>>> config
> >     > >>>>>> in the docs.
> >     > >>>>>>
> >     > >>>>>> About upgrading: why do we need have two rolling bounces
> and encode
> >     > >>>>>> "static" vs "dynamic" in the JoinGroupRequest?
> >     > >>>>>>
> >     > >>>>>> If we upgrade an existing consumer group from dynamic to
> static, I
> >     > >>>> don't
> >     > >>>>>> see any reason why both should not work together and single
> rolling
> >     > >>>>>> bounce would not be sufficient? If we bounce the first
> consumer and
> >     > >>>>>> switch from dynamic to static, it sends a `member.name`
> and the
> >     > >>> broker
> >     > >>>>>> registers the [member.name, member.id] in the cache. Why
> would
> >     > >> this
> >     > >>>>>> interfere with all other consumer that use dynamic
> assignment?
> >     > >>>>>>
> >     > >>>>>> Also, Guozhang mentioned that for all other request, we
> need to
> >     > >> check
> >     > >>>> if
> >     > >>>>>> the mapping [member.name, member.id] contains the send `
> member.id`
> >     > >>> --
> >     > >>>> I
> >     > >>>>>> don't think this is necessary -- it seems to be sufficient
> to check
> >     > >>> the
> >     > >>>>>> `member.id` from the [member.id, Assignment] mapping as be
> do
> >     > >> today
> >     > >>> --
> >     > >>>>>> thus, checking `member.id` does not require any change
> IMHO.
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>> -Matthias
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>>> On 8/7/18 7:13 PM, Guozhang Wang wrote:
> >     > >>>>>>> @James
> >     > >>>>>>>
> >     > >>>>>>> What you described is true: the transition from dynamic to
> static
> >     > >>>>>>> memberships are not thought through yet. But I do not
> think it is
> >     > >>> an
> >     > >>>>>>> impossible problem: note that we indeed moved the offset
> commit
> >     > >>> from
> >     > >>>> ZK
> >     > >>>>>> to
> >     > >>>>>>> kafka coordinator in 0.8.2 :) The migration plan is to
> first to
> >     > >>>>>>> double-commits on both zk and coordinator, and then do a
> second
> >     > >>> round
> >     > >>>>> to
> >     > >>>>>>> turn the zk off.
> >     > >>>>>>>
> >     > >>>>>>> So just to throw a wild idea here: also following a
> >     > >>>> two-rolling-bounce
> >     > >>>>>>> manner, in the JoinGroupRequest we can set the flag to
> "static"
> >     > >>> while
> >     > >>>>>> keep
> >     > >>>>>>> the registry-id field empty still, in this case, the
> coordinator
> >     > >>>> still
> >     > >>>>>>> follows the logic of "dynamic", accepting the request while
> >     > >>> allowing
> >     > >>>>> the
> >     > >>>>>>> protocol to be set to "static"; after the first rolling
> bounce,
> >     > >> the
> >     > >>>>> group
> >     > >>>>>>> protocol is already "static", then a second rolling bounce
> is
> >     > >>>> triggered
> >     > >>>>>> and
> >     > >>>>>>> this time we set the registry-id.
> >     > >>>>>>>
> >     > >>>>>>>
> >     > >>>>>>> Guozhang
> >     > >>>>>>>
> >     > >>>>>>> On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <
> >     > >> wushuja...@gmail.com>
> >     > >>>>>> wrote:
> >     > >>>>>>>
> >     > >>>>>>>> Guozhang, in a previous message, you proposed said this:
> >     > >>>>>>>>
> >     > >>>>>>>>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <
> wangg...@gmail.com
> >     > >>>
> >     > >>>>> wrote:
> >     > >>>>>>>>>
> >     > >>>>>>>>> 1. We bump up the JoinGroupRequest with additional
> fields:
> >     > >>>>>>>>>
> >     > >>>>>>>>> 1.a) a flag indicating "static" or "dynamic" membership
> >     > >>> protocols.
> >     > >>>>>>>>> 1.b) with "static" membership, we also add the
> pre-defined
> >     > >>> member
> >     > >>>>> id.
> >     > >>>>>>>>> 1.c) with "static" membership, we also add an optional
> >     > >>>>>>>>> "group-change-timeout" value.
> >     > >>>>>>>>>
> >     > >>>>>>>>> 2. On the broker side, we enforce only one of the two
> protocols
> >     > >>> for
> >     > >>>>> all
> >     > >>>>>>>>> group members: we accept the protocol on the first joined
> >     > >> member
> >     > >>> of
> >     > >>>>> the
> >     > >>>>>>>>> group, and if later joining members indicate a different
> >     > >>> membership
> >     > >>>>>>>>> protocol, we reject it. If the group-change-timeout
> value was
> >     > >>>>> different
> >     > >>>>>>>> to
> >     > >>>>>>>>> the first joined member, we reject it as well.
> >     > >>>>>>>>
> >     > >>>>>>>>
> >     > >>>>>>>> What will happen if we have an already-deployed
> application that
> >     > >>>> wants
> >     > >>>>>> to
> >     > >>>>>>>> switch to using static membership? Let’s say there are 10
> >     > >>> instances
> >     > >>>> of
> >     > >>>>>> it.
> >     > >>>>>>>> As the instances go through a rolling restart, they will
> switch
> >     > >>> from
> >     > >>>>>>>> dynamic membership (the default?) to static membership.
> As each
> >     > >>> one
> >     > >>>>>> leaves
> >     > >>>>>>>> the group and restarts, they will be rejected from the
> group
> >     > >>>> (because
> >     > >>>>>> the
> >     > >>>>>>>> group is currently using dynamic membership). The group
> will
> >     > >>> shrink
> >     > >>>>> down
> >     > >>>>>>>> until there is 1 node handling all the traffic. After
> that one
> >     > >>>>> restarts,
> >     > >>>>>>>> the group will switch over to static membership.
> >     > >>>>>>>>
> >     > >>>>>>>> Is that right? That means that the transition plan from
> dynamic
> >     > >> to
> >     > >>>>>> static
> >     > >>>>>>>> membership isn’t very smooth.
> >     > >>>>>>>>
> >     > >>>>>>>> I’m not really sure what can be done in this case. This
> reminds
> >     > >> me
> >     > >>>> of
> >     > >>>>>> the
> >     > >>>>>>>> transition plans that were discussed for moving from
> >     > >>> zookeeper-based
> >     > >>>>>>>> consumers to kafka-coordinator-based consumers. That was
> also
> >     > >>> hard,
> >     > >>>>> and
> >     > >>>>>>>> ultimately we decided not to build that.
> >     > >>>>>>>>
> >     > >>>>>>>> -James
> >     > >>>>>>>>
> >     > >>>>>>>>
> >     > >>>>>>>
> >     > >>>>>>>
> >     > >>>>>>
> >     > >>>>>>
> >     > >>>>>
> >     > >>>>
> >     > >>>>
> >     > >>>>
> >     > >>>> --
> >     > >>>> -- Guozhang
> >     > >>>>
> >     > >>>
> >     > >>
> >     > >>
> >     > >>
> >     > >> --
> >     > >> -- Guozhang
> >     > >>
> >     >
> >
> >
>
>

--
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to