Hi Boyang,

For Streams, I think we do not need an extra config for the instance id,
instead, we can re-use the way we construct the embedded consumer's client
id as:

[streams client-id] + "-StreamThread-" + [thread-id] + "-consumer"

So as long as user's specify the unique streams client-id, the resulted
consumer client-id / instance-id should be unique as well already.

As for the LeaveGroupRequest, as I understand it, your concern is that when
we are shutting down a single Streams instance that may contain multiple
threads, shutting down that instance would mean shutting down multiple
members. Personally I'd prefer to make the LeaveGroupRequest API more
general and less inclined to Streams (I think Mayuresh also suggested
this). So I'd suggest that we keep the LeaveGroupRequest API as suggested,
i.e. a list of member.instance.ids. And in Streams we can add a new API in
KafkaStreams to expose:

1) the list of embedded consumer / producer client ids,
2) the producer's txn ids if EOS is turned on, and
3) the consumer's instance ids.

So that Streams operators can read those values from KafkaStreams directly
before shutting it down and use the list in the LeaveGroupRequest API. How
about that?


Guozhang


On Fri, Nov 30, 2018 at 7:45 AM Mayuresh Gharat <gharatmayures...@gmail.com>
wrote:

> I like Guozhang's suggestion to not have to wait for session timeout in
> case we know that we want to downsize the consumer group and redistribute
> the partitions among the remaining consumers.
> IIUC, with the above suggestions, the admin api
> "removeMemberFromGroup(groupId, list[instanceId])" or
> "removeMemberFromGroup(groupId, instanceId)", will automatically cause a
> rebalance, right?
> I would prefer ist[instanceid] because that's more general scenario.
>
> Also I was thinking if we can have a replace API, that takes in a map of
> old to new instance Ids. Such that we can replace a consumer.
> IF we have this api, and if a consumer host goes down due to hardware
> issues, we can have another host spin up and take its place. This is like a
> cold backup which can be a step towards providing the hot backup that we
> discussed earlier in the KIP.
> Thoughts?
>
> Thanks,
>
> Mayuresh
>
> On Thu, Nov 29, 2018 at 1:30 AM Boyang Chen <bche...@outlook.com> wrote:
>
> > In fact I feel that it's more convenient for user to specify a list of
> > instance id prefixes. Because
> > for general consumer application we couldn't always find a proper prefix
> > to remove a list of consumers.
> > So we are either adding list[instanceid prefix], or we could add two
> > fields: instanceid prefix, and list[instanceid]
> > for clarity purpose. As you know, two options are equivalent since full
> > name is subset of prefix.
> >
> > Let me know your thoughts!
> >
> > Boyang
> > ________________________________
> > From: Boyang Chen <bche...@outlook.com>
> > Sent: Thursday, November 29, 2018 3:39 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > specifying member id
> >
> > Thanks Guozhang for the new proposal here!
> >
> > So I'd like to propose a slightly modified version of LeaveGroupRequest:
> > instead of letting the static member consumer client themselves to send
> the
> > request (which means we still need to have some hidden configs to turn it
> > off like we did today), how about just letting any other client to send
> > this request since the LeaveGroupRequest only requires group.id and
> > member.id? So back to your operational scenarios, if some static member
> > has
> > been found crashed and it is not likely to comeback, or we simply want to
> > shrink the size of the group by shutting down some static members, we can
> > use an admin client to send the LeaveGroupRequest after the instance has
> > been completely shutdown or crashed to kick them out of the group and
> also
> > triggers the rebalance.
> >
> > One issue though, is that users may not know the member id required in
> the
> > LeaveGroupRequest. To work around it we can add the `group.instance.id`
> > along with the member id as well and then allow member id null-able. The
> > coordinator logic would then be modified as 1) if member.id is
> specified,
> > ignore instance.id and always use member.id to find the member to kick
> > out,
> > 2) otherwise, try with the instance.id to find the corresponding
> member.id
> > and kick it out, 3) if none is found, reject with an error code.
> >
> > So in sum the alternative changes are:
> >
> > a) Modify LeaveGroupRequest to add group.instance.id
> > b) Modify coordinator logic to handle such request on the broker side.
> > c) Add a new API in AdminClient like "removeMemberFromGroup(groupId,
> > instanceId)" which will be translated as a LeaveGroupRequest.
> > d) [Optional] we can even batch the request by allowing
> > "removeMemberFromGroup(groupId, list[instanceId])" and then make `
> > member.id`
> > and `instance.id` field of LeaveGroupRequest to be an array instead of a
> > single entry.
> > e) We can also remove the admin ConsumerRebalanceRequest as well for
> > simplicity (why not? paranoid of having as less request protocols as
> > possible :), as it is not needed anymore with the above proposal.
> > I agree that reusing LeaveGroupRequest is actually a good idea: we only
> > need to iterate
> > over an existing request format. Also I found that we haven't discussed
> > how we want to enable
> > this feature on Streaming applications, which is different from common
> > consumer application in that
> > Stream app uses stream thread as individual consumer.
> > For example if user specifies the client id, the stream consumer client
> id
> > will be like:
> > User client id + "-StreamThread-" + thread id + "-consumer"
> >
> > So I'm thinking we should do sth similar for defining group.instance.id
> > on Stream. We shall define another
> > config called `stream.instance.id` which would be used as prefix, and
> for
> > each thread consumer the formula
> > will look like:
> > `group.instance.id` = `stream.instance.id` + "-" + thread id +
> "-consumer"
> >
> > And for the ease of use, the interface of leave group request could
> > include `group.instance.id.prefix` instead of
> > `group.instance.id` so that we could batch remove consumers relating to
> a
> > single stream instance. This is more intuitive
> > and flexible since specifying names of 16~32 * n (n = number of stream
> > instances to shut down) consumers is not an easy
> > job without client management tooling.
> >
> > How does this workaround sound?
> >
> > Boyang
> > ________________________________
> > From: Guozhang Wang <wangg...@gmail.com>
> > Sent: Thursday, November 29, 2018 2:38 AM
> > To: dev
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > specifying member id
> >
> > Hi Boyang,
> >
> > I was thinking that with the optional static members in the admin
> > ConsumerRebalanceRequest it should be sufficient to kick out the static
> > member before their session timeout (arguably long in practice) have not
> > reached. But now I see your concern is that in some situations the admin
> > operators may not even know the full list of static members, but ONLY
> know
> > which static member has failed and hence would like to kick out of the
> > group.
> >
> > So I'd like to propose a slightly modified version of LeaveGroupRequest:
> > instead of letting the static member consumer client themselves to send
> the
> > request (which means we still need to have some hidden configs to turn it
> > off like we did today), how about just letting any other client to send
> > this request since the LeaveGroupRequest only requires group.id and
> > member.id? So back to your operational scenarios, if some static member
> > has
> > been found crashed and it is not likely to comeback, or we simply want to
> > shrink the size of the group by shutting down some static members, we can
> > use an admin client to send the LeaveGroupRequest after the instance has
> > been completely shutdown or crashed to kick them out of the group and
> also
> > triggers the rebalance.
> >
> > One issue though, is that users may not know the member id required in
> the
> > LeaveGroupRequest. To work around it we can add the `group.instance.id`
> > along with the member id as well and then allow member id null-able. The
> > coordinator logic would then be modified as 1) if member.id is
> specified,
> > ignore instance.id and always use member.id to find the member to kick
> > out,
> > 2) otherwise, try with the instance.id to find the corresponding
> member.id
> > and kick it out, 3) if none is found, reject with an error code.
> >
> > So in sum the alternative changes are:
> >
> > a) Modify LeaveGroupRequest to add group.instance.id
> > b) Modify coordinator logic to handle such request on the broker side.
> > c) Add a new API in AdminClient like "removeMemberFromGroup(groupId,
> > instanceId)" which will be translated as a LeaveGroupRequest.
> > d) [Optional] we can even batch the request by allowing
> > "removeMemberFromGroup(groupId, list[instanceId])" and then make `
> > member.id`
> > and `instance.id` field of LeaveGroupRequest to be an array instead of a
> > single entry.
> > e) We can also remove the admin ConsumerRebalanceRequest as well for
> > simplicity (why not? paranoid of having as less request protocols as
> > possible :), as it is not needed anymore with the above proposal.
> >
> >
> > WDYT?
> >
> >
> > Guozhang
> >
> > On Wed, Nov 28, 2018 at 5:34 AM Boyang Chen <bche...@outlook.com> wrote:
> >
> > > Thanks Guozhang and Mayuresh for the follow up! Answers are listed
> below.
> > >
> > >
> > > >  5. Regarding "So in summary, *the member will only be removed due to
> > > > session timeout*. We shall remove it from both in-memory static
> member
> > > name
> > > > mapping and member list." If the rebalance is invoked manually using
> > the
> > > > the admin apis, how long should the group coordinator wait for the
> > > members
> > > > of the group to send a JoinGroupRequest for participating in the
> > > rebalance?
> > > > How is a lagging consumer handled?
> > >
> > > Great question. Let's use c1~c4 example here:
> > >
> > >   1.  Consumer c1, c2, c3, c4 in stable state
> > >   2.  c4 goes down and we detect this issue before session timeout
> > through
> > > client monitoring. Initiate a ConsumerRebalanceRequest.
> > >   3.  A rebalance will be kicking off, and after rebalance timeout we
> > > shall keep the same assignment for c1~4, if the session timeout for c4
> > > hasn't reached
> > >   4.  Group back to stable with c1~4 (although c4 is actually offline)
> > >   5.  c4 session timeout finally reached: another rebalance triggered.
> > >
> > > For step 3, if session timeout triggered within rebalance timeout, only
> > > c1~3 will be participating in the rebalance. This is what we mean by
> > saying
> > > "rebalance
> > > timeout shall not remove current members, only session timeout will
> do."
> > > As you could see this is not an ideal scenario: we trigger extra
> > rebalance
> > > at step 5. In my reply to Guozhang I'm asking whether we should still
> use
> > > LeaveGroupRequest for static members to send a signal to broker saying
> > "I'm
> > > currently offline", and when we send ConsumerRebalanceRequest to
> broker,
> > we
> > > will actually kick off c4 because it says it's offline already, saving
> > one
> > > or multiple additional rebalances later. This way the
> > > ConsumerRebalanceRequest will be more effective in making correct
> > judgement
> > > on the group status since we have more feedback from client side.
> > >
> > > > - When we say that we would use invokeConsumerRebalance(groupId) to
> > down
> > > > scale, with the example in the above question, how will the
> > > > GroupCoordinator know that c4 should be kicked out of the group since
> > we
> > > > are trying to invoke rebalance proactively without waiting for c4's
> > > session
> > > > time out to expire. Should there be a way of telling the
> > GroupCoordinator
> > > > that consumer c4 has been kicked out of the groupId = "GroupA"?
> > > Previous proposal should be suffice to answer this question 😊
> > >
> > > - Also it looks like the statement "If the `member.id` uses
> > > > UNKNOWN_MEMBER_NAME, we shall always generate a new member id and
> > replace
> > > > the one within current map, if `group.member.name` is known. Also
> once
> > > we
> > > > are done with KIP-394
> > > > <
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=JvM8EWgbyIouukmr%2FE9uuW%2BF96Nbc8TKl%2BwRu9IUwDM%3D&amp;reserved=0
> > > > >,
> > > > all the join group requests are requiring `member.id` to physically
> > > enter
> > > > the consumer group. This way the latest joined " is incomplete. Can
> you
> > > > take a look at this?
> > > > Also when we say "all the join group requests are requiring `
> member.id
> > `
> > > to
> > > > physically enter the consumer group." because a newly started
> consumer
> > > will
> > > > not have a "member.id", I assume you mean, once the GroupCoordinator
> > > > assigns a member.id to the newly started consumer, it has to use it
> > for
> > > > any
> > > > future JoinGroupRequests. Is my understanding correct?
> > > >
> > > Thanks for catching it! And yes, we shall use one extra round-trip
> > between
> > > consumer
> > > and broker to inform the new member id allocation.
> > >
> > > Next is the replies to Guozhang's comment:
> > > 2) I once have a discussion about the LeaveGroupRequest for static
> > members,
> > > and the reason for not having it for static members is that we'd need
> to
> > > make it a configurable behavior as well (i.e. the likelihood that a
> > static
> > > member may shutdown but come back later may be even larger than the
> > > likelihood that a shutdown static member would not come back), and
> when a
> > > shutdown is complete the instance cannot tell whether or not it will
> come
> > > back by itself. And hence letting a third party (think: admin used by
> K8s
> > > plugins) issuing a request to indicate static member changes would be
> > more
> > > plausible.
> > >
> > > I think having an optional list of all the static members that are
> still
> > in
> > > the group, rather than the members to be removed since the latter
> looks a
> > > bit less flexible to me, in the request is a good idea (remember we
> > allow a
> > > group to have both static and dynamic members at the same time, so when
> > > receiving the request, we will only do the diff and add / remove the
> > static
> > > members directly only, while still let the dynamic members to try to
> > > re-join the group with the rebalance timeout).
> > > I'm also in favor of storing all the in-group static members. In fact
> we
> > > could reuse
> > > the static membership mapping to store this information. Do you think
> > > that we should let static member send leave group request to indicate
> > > their status of "leaving",
> > > and use ConsumerRebalanceRequest to trigger rebalance without them? I'm
> > > suggesting we should
> > > remove those members when kicking off rebalance since we are shutting
> > them
> > > down already.
> > >
> > > 3) personally I favor "ids" over "names" :) Since we already have some
> > > "ids" and hence it sounds more consistent, plus on the producer side we
> > > have a `transactional.id` whose semantics is a bit similar to this
> one,
> > > i.e. for unique distinguishment of a client which may comes and goes
> but
> > > need to be persist over multiple "instance life-times".
> > > Sure we have enough votes for ids 😊I will finalize the name to `
> > > group.instance.id`, does that
> > > sound good?
> > >
> > > Best,
> > > Boyang
> > > ________________________________
> > > From: Guozhang Wang <wangg...@gmail.com>
> > > Sent: Wednesday, November 28, 2018 4:51 AM
> > > To: dev
> > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > > specifying member id
> > >
> > > Regarding Jason's question and Boyang's responses:
> > >
> > > 2) I once have a discussion about the LeaveGroupRequest for static
> > members,
> > > and the reason for not having it for static members is that we'd need
> to
> > > make it a configurable behavior as well (i.e. the likelihood that a
> > static
> > > member may shutdown but come back later may be even larger than the
> > > likelihood that a shutdown static member would not come back), and
> when a
> > > shutdown is complete the instance cannot tell whether or not it will
> come
> > > back by itself. And hence letting a third party (think: admin used by
> K8s
> > > plugins) issuing a request to indicate static member changes would be
> > more
> > > plausible.
> > >
> > > I think having an optional list of all the static members that are
> still
> > in
> > > the group, rather than the members to be removed since the latter
> looks a
> > > bit less flexible to me, in the request is a good idea (remember we
> > allow a
> > > group to have both static and dynamic members at the same time, so when
> > > receiving the request, we will only do the diff and add / remove the
> > static
> > > members directly only, while still let the dynamic members to try to
> > > re-join the group with the rebalance timeout).
> > >
> > > 3) personally I favor "ids" over "names" :) Since we already have some
> > > "ids" and hence it sounds more consistent, plus on the producer side we
> > > have a `transactional.id` whose semantics is a bit similar to this
> one,
> > > i.e. for unique distinguishment of a client which may comes and goes
> but
> > > need to be persist over multiple "instance life-times".
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Nov 27, 2018 at 10:00 AM Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Hi Boyang,
> > > >
> > > > Thanks for the replies. Please find the follow up queries below.
> > > >
> > > >     5. Regarding "So in summary, *the member will only be removed due
> > to
> > > > session timeout*. We shall remove it from both in-memory static
> member
> > > name
> > > > mapping and member list." If the rebalance is invoked manually using
> > the
> > > > the admin apis, how long should the group coordinator wait for the
> > > members
> > > > of the group to send a JoinGroupRequest for participating in the
> > > rebalance?
> > > > How is a lagging consumer handled?
> > > > The plan is to disable member kick out when rebalance.timeout is
> > reached,
> > > > so basically we are not "waiting" any
> > > > join group request from existing members; we shall just rebalance
> base
> > on
> > > > what we currently have within the group
> > > > metadata. Lagging consumer will trigger rebalance later if session
> > > timeout
> > > > > rebalance timeout.
> > > >
> > > > >
> > > > Just wanted to understand this better. Lets take an example, say we
> > have
> > > a
> > > > > consumer group "GroupA" with 4 consumers  c1, c2, c3, c4.
> > > > > Everything is running fine and suddenly C4 host has issues and it
> > goes
> > > > > down. Now we notice that we can still operate with c1, c2, c3 and
> > don't
> > > > > want to wait for
> > > > > c4 to come back up. We use the admin api
> > > > > "invokeConsumerRebalance("GroupA")".
> > > > > Now the GroupCoordinator, will ask the members c1, c2, c3 to join
> the
> > > > > group again (in there heartBeatResponse) as first step of
> rebalance.
> > > > > Now lets say that c1, c2 immediately send a joinGroupRequest but c3
> > is
> > > > > delayed. At this stage, if we are not "waiting" on any join group
> > > > request,
> > > > > few things can happen :
> > > > >
> > > > >    - c4's partitions are distributed only among c1,c2. c3 maintains
> > its
> > > > >    original assignment. c1, c2 will start processing the newly
> > assigned
> > > > >    partitions.
> > > > >
> > > > > OR
> > > > >
> > > > >    - c4's partitions are distributed among c1, c2, c3. c1 and c2
> > start
> > > > >    processing the newly assigned partitions. c3 gets to know about
> > the
> > > > newly
> > > > >    assigned partitions later when it sends the JoinGroupRequest
> > (which
> > > > was
> > > > >    delayed).
> > > > >
> > > > > OR
> > > > >
> > > > >    - Will the rebalance do a complete reassignment, where c1, c2,
> c3
> > > have
> > > > >    to give up there partitions and all the partitions belonging to
> > c1,
> > > > c2, c3,
> > > > >    c4 will be redistributed among c1, c2, c3 ? If this is the case,
> > the
> > > > >    GroupCoordinator needs to give some buffer time for c1, c2, c3
> to
> > > > revoke
> > > > >    there partitions and rejoin the group.
> > > > >
> > > > > This is as per my understanding of how the KIP would work without
> > > > changing
> > > > > the underlying group coordination workflow. Please correct me if I
> > > > > misunderstood something here.
> > > > >
> > > >
> > > >
> > > > - When we say that we would use invokeConsumerRebalance(groupId) to
> > down
> > > > scale, with the example in the above question, how will the
> > > > GroupCoordinator know that c4 should be kicked out of the group since
> > we
> > > > are trying to invoke rebalance proactively without waiting for c4's
> > > session
> > > > time out to expire. Should there be a way of telling the
> > GroupCoordinator
> > > > that consumer c4 has been kicked out of the groupId = "GroupA"?
> > > >
> > > > - Also it looks like the statement "If the `member.id` uses
> > > > UNKNOWN_MEMBER_NAME, we shall always generate a new member id and
> > replace
> > > > the one within current map, if `group.member.name` is known. Also
> once
> > > we
> > > > are done with KIP-394
> > > > <
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=JvM8EWgbyIouukmr%2FE9uuW%2BF96Nbc8TKl%2BwRu9IUwDM%3D&amp;reserved=0
> > > > >,
> > > > all the join group requests are requiring `member.id` to physically
> > > enter
> > > > the consumer group. This way the latest joined " is incomplete. Can
> you
> > > > take a look at this?
> > > > Also when we say "all the join group requests are requiring `
> member.id
> > `
> > > to
> > > > physically enter the consumer group." because a newly started
> consumer
> > > will
> > > > not have a "member.id", I assume you mean, once the GroupCoordinator
> > > > assigns a member.id to the newly started consumer, it has to use it
> > for
> > > > any
> > > > future JoinGroupRequests. Is my understanding correct?
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Mon, Nov 26, 2018 at 9:20 PM Boyang Chen <bche...@outlook.com>
> > wrote:
> > > >
> > > > > Thanks Mayuresh and Jason for your follow-ups! Let me try to answer
> > > both
> > > > > in this reply.
> > > > >
> > > > >
> > > > > >    1. Do you intend to have member.id is a static config like
> > > > > member.name
> > > > > >    after KIP-345 and KIP-394?
> > > > >
> > > > > No, we shall only rely on broker to allocate member.id for the
> > > consumer
> > > > > instances. FYI, I already
> > > > >
> > > > > started the discussion thread for KIP-394 😊
> > > > >
> > > > > >    2. Regarding "On client side, we add a new config called
> > > MEMBER_NAME
> > > > > in
> > > > > >    ConsumerConfig. On consumer service init, if the MEMBER_NAME
> > > config
> > > > is
> > > > > > set,
> > > > > >    we will put it in the initial join group request to identify
> > > itself
> > > > > as a
> > > > > >    static member (static membership); otherwise, we will still
> send
> > > > > >    UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID
> > > > > (dynamic
> > > > > >    membership)."
> > > > > >       - What is the value of member_id sent in the first
> > > > JoinGroupRequest
> > > > > >       when member_name is set (using static rebalance)? Is it
> > > > > > UNKNOW_MEMBER_ID?
> > > > >
> > > > > Yes, we could only use unknown member id. Actually this part of the
> > > > > proposal is outdated,
> > > > >
> > > > > let me do another audit of the whole doc. Basically, it is
> currently
> > > > > impossible to send `member.id`
> > > > >
> > > > > when consumer restarted. Sorry for the confusions!
> > > > >
> > > > > >    3. Regarding "we are requiring member.id (if not unknown) to
> > > match
> > > > > the
> > > > > >    value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The
> > > edge
> > > > > case
> > > > > >    that if we could have members with the same `member.name`
> (for
> > > > > example
> > > > > >    mis-configured instances with a valid member.id but added a
> > used
> > > > > member
> > > > > >    name on runtime). When member name has duplicates, we could
> > refuse
> > > > > join
> > > > > >    request from members with an outdated `member.id` (since we
> > > update
> > > > > the
> > > > > >    mapping upon each join group request). In an edge case where
> the
> > > > > client
> > > > > >    hits this exception in the response, it is suggesting that
> some
> > > > other
> > > > > >    consumer takes its spot."
> > > > > >       - The part of "some other consumer takes the spot" would be
> > > > > >       intentional, right? Also when you say " The edge case that
> if
> > > we
> > > > > >       could have members with the same `member.name` (for
> example
> > > > > >       mis-configured instances *with a valid member.id <
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=1dQUbfyutqmSN58If1NisiS4Momk4Ri6v9B6DPS7bno%3D&amp;reserved=0
> > > > > >
> > > > > > *but
> > > > > >       added a used member name on runtime).", what do you mean by
> > > > *valid
> > > > > >       member id* here? Does it mean that there exist a mapping of
> > > > > >       member.name to member.id like *MemberA -> id1* on the
> > > > > >       GroupCoordinator and this consumer is trying to join with *
> > > > > > member.name
> > > > > >       <
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=ckYE3gR46UxmbhDCqeZkfqfR%2F3sM60b8eZUFL0n8l%2F4%3D&amp;reserved=0
> > > > >
> > > > > = MemberB and member.id <
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=1dQUbfyutqmSN58If1NisiS4Momk4Ri6v9B6DPS7bno%3D&amp;reserved=0
> > > > >
> > > > > =
> > > > > > id1 *
> > > > > >       ?
> > > > >
> > > > > I would take Jason's advice that each time we have unknown member
> > > joining
> > > > > the group, the broker will
> > > > >
> > > > > always assign a new and unique id to track its identity. In this
> way,
> > > > > consumer with duplicate member name
> > > > >
> > > > > will be fenced.
> > > > >
> > > > > >    4. Depending on your explanation for point 2 and the point 3
> > above
> > > > > >    regarding returning back MEMBER_ID_MISMATCH on having a
> matching
> > > > > >    member_name but unknown member_id, if the consumer sends
> > > > > > "UNKNOW_MEMBER_ID"
> > > > > >    on the first JoinGroupRequest and relies on the
> GroupCoordinator
> > > to
> > > > > > give it
> > > > > >    a member_id, is the consumer suppose to remember member_id for
> > > > > >    joinGroupRequests? If yes, how are restarts handled?
> > > > >
> > > > > Like explained above, we shall not materialize the member.id.
> > Instead
> > > we
> > > > > need to rely on broker to allocate
> > > > >
> > > > > a unique id for consumer just like what we have now.
> > > > >
> > > > > >    5. Regarding "So in summary, *the member will only be removed
> > due
> > > to
> > > > > >    session timeout*. We shall remove it from both in-memory
> static
> > > > member
> > > > > >    name mapping and member list."
> > > > > >       - If the rebalance is invoked manually using the the admin
> > > apis,
> > > > > how
> > > > > >       long should the group coordinator wait for the members of
> the
> > > > > > group to send
> > > > > >       a JoinGroupRequest for participating in the rebalance? How
> > is a
> > > > > > lagging
> > > > > >       consumer handled?
> > > > >
> > > > > The plan is to disable member kick out when rebalance.timeout is
> > > reached,
> > > > > so basically we are not "waiting" any
> > > > >
> > > > > join group request from existing members; we shall just rebalance
> > base
> > > on
> > > > > what we currently have within the group
> > > > >
> > > > > metadata. Lagging consumer will trigger rebalance later if session
> > > > timeout
> > > > > > rebalance timeout.
> > > > >
> > > > > >    6. Another detail to take care is that we need to
> automatically
> > > take
> > > > > the
> > > > > >    hash of group id so that we know which broker to send this
> > request
> > > > to.
> > > > > >       - I assume this should be same as the way we find the
> > > > coordinator,
> > > > > >       today right? If yes, should we specify it in the KIP ?
> > > > >
> > > > > Yep, it is. Add FindCoordinatorRequest logic to the script.
> > > > >
> > > > > >    7. Are there any specific failure scenarios when you say
> "other
> > > > > >    potential failure cases."? It would be good to mention them
> > > > > explicitly,
> > > > > > if
> > > > > >    you think there are any.
> > > > >
> > > > > Nah, I'm gonna remove it because it seems causing more confusion
> than
> > > > > making my assumption clear, which is
> > > > >
> > > > > "there could be other failure cases that I can't enumerate now" 😊
> > > > >
> > > > > >    8. It would be good to have a rollback plan as you have for
> roll
> > > > > forward
> > > > > >    in the KIP.
> > > > >
> > > > > Great suggestion! Added a simple rollback plan.
> > > > >
> > > > >
> > > > > Next is answering Jason's suggestions:
> > > > >
> > > > > 1. This may be the same thing that Mayuresh is asking about. I
> think
> > > the
> > > > > suggestion in the KIP is that if a consumer sends JoinGroup with a
> > > member
> > > > > name, but no member id, then we will return the current member id
> > > > > associated with that name. It seems in this case that we wouldn't
> be
> > > able
> > > > > to protect from having two consumers active with the same
> configured
> > > > > member.name? For example, imagine that we had a consumer with
> > > > member.name
> > > > > =A
> > > > > which is assigned member.id=1. Suppose it becomes a zombie and a
> new
> > > > > instance starts up with member.name=A. If it is also assigned
> > > member.id
> > > > =1,
> > > > > then how can we detect the zombie if it comes back to life? Both
> > > > instances
> > > > > will have the same member.id.
> > > > >
> > > > > The goal is to avoid a rebalance on a rolling restart, but we still
> > > need
> > > > to
> > > > > fence previous members. I am wondering if we can generate a new
> > > > member.id
> > > > > every time we receive a request from a static member with an
> unknown
> > > > member
> > > > > id. If the old instance with the same member.name attempts any
> > > > operation,
> > > > > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long as
> > the
> > > > > subscription of the new instance hasn't changed, then we can skip
> the
> > > > > rebalance and return the current assignment without forcing a
> > > rebalance.
> > > > >
> > > > > The trick to making this work is in the error handling of the
> zombie
> > > > > consumer. If the zombie simply resets its member.id and rejoins to
> > > get a
> > > > > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would
> end
> > > up
> > > > > fencing the new member. We want to avoid this. There needs to be an
> > > > > expectation for static members that the member.id of a static
> member
> > > > will
> > > > > not be changed except when a new member with the same member.name
> > > joins
> > > > > the
> > > > > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for
> > > consumers
> > > > > with static member names.
> > > > >
> > > > > Yep, I like this idea! Keep giving out refresh member.id when
> facing
> > > > > anonymous request will definitely
> > > > >
> > > > > prevent processing bug due to duplicate consumers, however I don't
> > > think
> > > > I
> > > > > fully understand the 3rd paragraph where
> > > > >
> > > > > you mentioned  "There needs to be an expectation for static members
> > > that
> > > > > the member.id of a static member will
> > > > >
> > > > > not be changed except when a new member with the same member.name
> > > joins
> > > > > the group. "  How do you plan
> > > > > to know whether this member is new member or old member? I feel
> even
> > > with
> > > > > zombie consumer takes the ownership,
> > > > > it should be detected very quickly (as MISMATCH_ID exception
> trigger
> > > > > original consumer instance dies)
> > > > > and end user will start to fix it right away. Is there any similar
> > > logic
> > > > > we applied in fencing duplicate `transaction.id`?
> > > > >
> > > > > 2. The mechanics of the ConsumerRebalance API seem unclear to me.
> As
> > > far
> > > > as
> > > > > I understand it, it is used for scaling down a consumer group and
> > > somehow
> > > > > bypasses normal session timeout expiration. I am wondering how
> > critical
> > > > > this piece is and whether we can leave it for future work. If not,
> > then
> > > > it
> > > > > would be helpful to elaborate on its implementation. How would the
> > > > > coordinator know which members to kick out of the group?
> > > > >
> > > > > This API is needed when we need to immediately trigger rebalance
> > > instead
> > > > > of waiting session timeout
> > > > >
> > > > > or rebalance timeout (Emergent scale up/down). It is very necessary
> > to
> > > > > have it for
> > > > >
> > > > > management purpose because user could choose when to trigger
> > rebalance
> > > > > pretty freely,
> > > > >
> > > > > gaining more client side control.
> > > > >
> > > > > In the meanwhile I see your point that we need to actually have the
> > > > > ability to kick out members that we plan
> > > > >
> > > > > to scale down fast (as rebalance timeout no longer kicks any
> offline
> > > > > member out of the group), I will think of adding an optional
> > > > >
> > > > > list of members that are ready to be removed.
> > > > >
> > > > > Another idea is to let static member send `LeaveGroupRequest` when
> > they
> > > > > are going offline (either scale down or bouncing),
> > > > >
> > > > > and broker will cache this information as "OfflineMembers" without
> > > > > triggering rebalance. When handling ConsumerRebalanceRequest broker
> > > will
> > > > >
> > > > > kick the static members that are currently offline and trigger
> > > rebalance
> > > > > immediately. How does this plan sound?
> > > > >
> > > > > 3. I've been holding back on mentioning this, but I think we should
> > > > > reconsider the name `member.name`. I think we want something that
> > > > suggests
> > > > > its expectation of uniqueness in the group. How about `
> > > group.instance.id
> > > > `
> > > > > to go along with `group.id`?
> > > > >
> > > > > Yea, Dong and Stanislav also mentioned this naming. I personally
> buy
> > in
> > > > > the namespace idea, and
> > > > >
> > > > > since we already use `member.name` in a lot of context, I decide
> to
> > > > > rename the config to `group.member.name`
> > > > >
> > > > > which should be sufficient for solving all the concerns we have
> now.
> > > > > Sounds good?
> > > > >
> > > > >
> > > > > Thank you for your great suggestions! Let me know if my reply makes
> > > sense
> > > > > her.
> > > > >
> > > > >
> > > > > Best,
> > > > >
> > > > > Boyang
> > > > >
> > > > > ________________________________
> > > > > From: Jason Gustafson <ja...@confluent.io>
> > > > > Sent: Tuesday, November 27, 2018 7:51 AM
> > > > > To: dev
> > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> > by
> > > > > specifying member id
> > > > >
> > > > > Hi Boyang,
> > > > >
> > > > > Thanks for the updates. Looks like we're headed in the right
> > direction
> > > > and
> > > > > clearly the interest that this KIP is receiving shows how strong
> the
> > > > > motivation is!
> > > > >
> > > > > I have a few questions:
> > > > >
> > > > > 1. This may be the same thing that Mayuresh is asking about. I
> think
> > > the
> > > > > suggestion in the KIP is that if a consumer sends JoinGroup with a
> > > member
> > > > > name, but no member id, then we will return the current member id
> > > > > associated with that name. It seems in this case that we wouldn't
> be
> > > able
> > > > > to protect from having two consumers active with the same
> configured
> > > > > member.name? For example, imagine that we had a consumer with
> > > > member.name
> > > > > =A
> > > > > which is assigned member.id=1. Suppose it becomes a zombie and a
> new
> > > > > instance starts up with member.name=A. If it is also assigned
> > > member.id
> > > > =1,
> > > > > then how can we detect the zombie if it comes back to life? Both
> > > > instances
> > > > > will have the same member.id.
> > > > >
> > > > > The goal is to avoid a rebalance on a rolling restart, but we still
> > > need
> > > > to
> > > > > fence previous members. I am wondering if we can generate a new
> > > > member.id
> > > > > every time we receive a request from a static member with an
> unknown
> > > > member
> > > > > id. If the old instance with the same member.name attempts any
> > > > operation,
> > > > > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long as
> > the
> > > > > subscription of the new instance hasn't changed, then we can skip
> the
> > > > > rebalance and return the current assignment without forcing a
> > > rebalance.
> > > > >
> > > > > The trick to making this work is in the error handling of the
> zombie
> > > > > consumer. If the zombie simply resets its member.id and rejoins to
> > > get a
> > > > > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would
> end
> > > up
> > > > > fencing the new member. We want to avoid this. There needs to be an
> > > > > expectation for static members that the member.id of a static
> member
> > > > will
> > > > > not be changed except when a new member with the same member.name
> > > joins
> > > > > the
> > > > > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for
> > > consumers
> > > > > with static member names.
> > > > >
> > > > > 2. The mechanics of the ConsumerRebalance API seem unclear to me.
> As
> > > far
> > > > as
> > > > > I understand it, it is used for scaling down a consumer group and
> > > somehow
> > > > > bypasses normal session timeout expiration. I am wondering how
> > critical
> > > > > this piece is and whether we can leave it for future work. If not,
> > then
> > > > it
> > > > > would be helpful to elaborate on its implementation. How would the
> > > > > coordinator know which members to kick out of the group?
> > > > >
> > > > > 3. I've been holding back on mentioning this, but I think we should
> > > > > reconsider the name `member.name`. I think we want something that
> > > > suggests
> > > > > its expectation of uniqueness in the group. How about `
> > > group.instance.id
> > > > `
> > > > > to go along with `group.id`?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Nov 26, 2018 at 10:18 AM Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Boyang,
> > > > > >
> > > > > > Thanks a lot for replying to all the queries and discussions
> here,
> > so
> > > > > > patiently.
> > > > > > Really appreciate it.
> > > > > >
> > > > > > Had a few questions and suggestions after rereading the current
> > > version
> > > > > of
> > > > > > the KIP :
> > > > > >
> > > > > >
> > > > > >    1. Do you intend to have member.id is a static config like
> > > > > member.name
> > > > > >    after KIP-345 and KIP-394?
> > > > > >    2. Regarding "On client side, we add a new config called
> > > MEMBER_NAME
> > > > > in
> > > > > >    ConsumerConfig. On consumer service init, if the MEMBER_NAME
> > > config
> > > > is
> > > > > > set,
> > > > > >    we will put it in the initial join group request to identify
> > > itself
> > > > > as a
> > > > > >    static member (static membership); otherwise, we will still
> send
> > > > > >    UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID
> > > > > (dynamic
> > > > > >    membership)."
> > > > > >       - What is the value of member_id sent in the first
> > > > JoinGroupRequest
> > > > > >       when member_name is set (using static rebalance)? Is it
> > > > > > UNKNOW_MEMBER_ID?
> > > > > >    3. Regarding "we are requiring member.id (if not unknown) to
> > > match
> > > > > the
> > > > > >    value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The
> > > edge
> > > > > case
> > > > > >    that if we could have members with the same `member.name`
> (for
> > > > > example
> > > > > >    mis-configured instances with a valid member.id but added a
> > used
> > > > > member
> > > > > >    name on runtime). When member name has duplicates, we could
> > refuse
> > > > > join
> > > > > >    request from members with an outdated `member.id` (since we
> > > update
> > > > > the
> > > > > >    mapping upon each join group request). In an edge case where
> the
> > > > > client
> > > > > >    hits this exception in the response, it is suggesting that
> some
> > > > other
> > > > > >    consumer takes its spot."
> > > > > >       - The part of "some other consumer takes the spot" would be
> > > > > >       intentional, right? Also when you say " The edge case that
> if
> > > we
> > > > > >       could have members with the same `member.name` (for
> example
> > > > > >       mis-configured instances *with a valid member.id <
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=1dQUbfyutqmSN58If1NisiS4Momk4Ri6v9B6DPS7bno%3D&amp;reserved=0
> > > > > >
> > > > > > *but
> > > > > >       added a used member name on runtime).", what do you mean by
> > > > *valid
> > > > > >       member id* here? Does it mean that there exist a mapping of
> > > > > >       member.name to member.id like *MemberA -> id1* on the
> > > > > >       GroupCoordinator and this consumer is trying to join with *
> > > > > > member.name
> > > > > >       <
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=ckYE3gR46UxmbhDCqeZkfqfR%2F3sM60b8eZUFL0n8l%2F4%3D&amp;reserved=0
> > > > >
> > > > > = MemberB and member.id <
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=1dQUbfyutqmSN58If1NisiS4Momk4Ri6v9B6DPS7bno%3D&amp;reserved=0
> > > > >
> > > > > =
> > > > > > id1 *
> > > > > >       ?
> > > > > >    4. Depending on your explanation for point 2 and the point 3
> > above
> > > > > >    regarding returning back MEMBER_ID_MISMATCH on having a
> matching
> > > > > >    member_name but unknown member_id, if the consumer sends
> > > > > > "UNKNOW_MEMBER_ID"
> > > > > >    on the first JoinGroupRequest and relies on the
> GroupCoordinator
> > > to
> > > > > > give it
> > > > > >    a member_id, is the consumer suppose to remember member_id for
> > > > > >    joinGroupRequests? If yes, how are restarts handled?
> > > > > >    5. Regarding "So in summary, *the member will only be removed
> > due
> > > to
> > > > > >    session timeout*. We shall remove it from both in-memory
> static
> > > > member
> > > > > >    name mapping and member list."
> > > > > >       - If the rebalance is invoked manually using the the admin
> > > apis,
> > > > > how
> > > > > >       long should the group coordinator wait for the members of
> the
> > > > > > group to send
> > > > > >       a JoinGroupRequest for participating in the rebalance? How
> > is a
> > > > > > lagging
> > > > > >       consumer handled?
> > > > > >    6. Another detail to take care is that we need to
> automatically
> > > take
> > > > > the
> > > > > >    hash of group id so that we know which broker to send this
> > request
> > > > to.
> > > > > >       - I assume this should be same as the way we find the
> > > > coordinator,
> > > > > >       today right? If yes, should we specify it in the KIP ?
> > > > > >    7. Are there any specific failure scenarios when you say
> "other
> > > > > >    potential failure cases."? It would be good to mention them
> > > > > explicitly,
> > > > > > if
> > > > > >    you think there are any.
> > > > > >    8. It would be good to have a rollback plan as you have for
> roll
> > > > > forward
> > > > > >    in the KIP.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Mayuresh
> > > > > >
> > > > > > On Mon, Nov 26, 2018 at 8:17 AM Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Boyang,
> > > > > > >
> > > > > > > Do you have a discuss thread for KIP-394 that you mentioned
> here
> > ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Mayuresh
> > > > > > >
> > > > > > > On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen <
> bche...@outlook.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Hey Dong, thanks for the follow-up here!
> > > > > > >>
> > > > > > >>
> > > > > > >> 1) It is not very clear to the user what is the difference
> > between
> > > > > > >> member.name and client.id as both seems to be used to
> identify
> > > the
> > > > > > >> consumer. I am wondering if it would be more intuitive to name
> > it
> > > > > > >> group.member.name (preferred choice since it matches the
> > current
> > > > > > group.id
> > > > > > >> config name) or rebalance.member.name to explicitly show that
> > the
> > > > id
> > > > > is
> > > > > > >> solely used for rebalance.
> > > > > > >> Great question. I feel `member.name` is enough to explain
> > itself,
> > > > it
> > > > > > >> seems not very
> > > > > > >> helpful to make the config name longer. Comparing `name` with
> > `id`
> > > > > gives
> > > > > > >> user the
> > > > > > >> impression that they have the control over it with customized
> > rule
> > > > > than
> > > > > > >> library decided.
> > > > > > >>
> > > > > > >> 2) In the interface change section it is said that
> > > > > > >> GroupMaxSessionTimeoutMs
> > > > > > >> will be changed to 30 minutes. It seems to suggest that we
> will
> > > > change
> > > > > > the
> > > > > > >> default value of this config. It does not seem necessary to
> > > increase
> > > > > the
> > > > > > >> time of consumer failure detection when user doesn't use
> static
> > > > > > >> membership.
> > > > > > >> Also, say static membership is enabled, then this default
> config
> > > > > change
> > > > > > >> will cause a partition to be unavailable for consumption for
> 30
> > > > > minutes
> > > > > > if
> > > > > > >> there is hard consumer failure, which seems to be worse
> > experience
> > > > > than
> > > > > > >> having unnecessary rebalance (when this timeout is small),
> > > > > particularly
> > > > > > >> for
> > > > > > >> new users of Kafka. Could you explain more why we should make
> > this
> > > > > > change?
> > > > > > >> We are not changing the default session timeout value. We are
> > just
> > > > > > >> changing the
> > > > > > >> cap we are enforcing on the session timeout max value. So this
> > > > change
> > > > > is
> > > > > > >> not affecting
> > > > > > >> what kind of membership end user is using, and loosing the cap
> > is
> > > > > giving
> > > > > > >> end user
> > > > > > >> more flexibility on trade-off between liveness and stability.
> > > > > > >>
> > > > > > >> 3) Could we just combine MEMBER_ID_MISMATCH and
> > > > > DUPLICATE_STATIC_MEMBER
> > > > > > >> into one error? It seems that these two errors are currently
> > > handled
> > > > > by
> > > > > > >> the
> > > > > > >> consumer in the same way. And we don't also don't expect
> > > > > > >> MEMBER_ID_MISMATCH
> > > > > > >> to happen. Thus it is not clear what is the benefit of having
> > two
> > > > > > errors.
> > > > > > >> I agree that we should remove DUPLICATE_STATIC_MEMBER error
> > > because
> > > > > with
> > > > > > >> the KIP-394<
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember%2Bid%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=E3cqYTMRFsAs5TQI4JxHm3kOWCfkVWjpuc%2BuNHezwG0%3D&amp;reserved=0
> > > > > > >> >
> > > > > > >> we will automatically fence all join requests with
> > > > UNKNOWN_MEMBER_ID.
> > > > > > >>
> > > > > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join
> group
> > > > > > contains
> > > > > > >> member name which is already in the consumer group, however
> the
> > > > member
> > > > > > id
> > > > > > >> was missing". After a consumer is restarted, it will send a
> > > > > > >> JoinGroupRequest with an existing memberName (as the
> coordinator
> > > has
> > > > > not
> > > > > > >> expired this member from the memory) and memberId
> > > > > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not
> > > > persisted
> > > > > > >> across consumer restart in the consumer side). Does it mean
> that
> > > > > > >> JoinGroupRequest from a newly restarted consumer will always
> be
> > > > > rejected
> > > > > > >> until the sessionTimeoutMs has passed?
> > > > > > >> Same answer as question 3). This part of the logic shall be
> > > removed
> > > > > from
> > > > > > >> the proposal.
> > > > > > >>
> > > > > > >> 5) It seems that we always add two methods to the interface
> > > > > > >> org.apache.kafka.clients.admin.AdminClient.java, one with
> > options
> > > > and
> > > > > > the
> > > > > > >> other without option. Could this be specified in the interface
> > > > change
> > > > > > >> section?
> > > > > > >> Sounds good! Added both methods.
> > > > > > >>
> > > > > > >> 6) Do we plan to have off-the-shelf command line tool for SRE
> to
> > > > > trigger
> > > > > > >> rebalance? If so, we probably want to specify the command line
> > > tool
> > > > > > >> interface similar to
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=gWbzMbM%2ByA8%2FGbeC5Eh6kt8FuE5j%2FnrHaKE%2FhipcCBQ%3D&amp;reserved=0
> > > > > > >> .
> > > > > > >> Added the script.
> > > > > > >>
> > > > > > >> 7) Would it be simpler to replace name "forceStaticRebalance"
> > with
> > > > > > >> "invokeConsumerRebalance"? It is not very clear what is the
> > extra
> > > > > > meaning
> > > > > > >> of world "force" as compared to "trigger" or "invoke". And it
> > > seems
> > > > > > >> simpler
> > > > > > >> to allows this API to trigger rebalance regardless of whether
> > > > consumer
> > > > > > is
> > > > > > >> configured with memberName.
> > > > > > >> Sounds good. Right now I feel for both static and dynamic
> > > membership
> > > > > it
> > > > > > is
> > > > > > >> more manageable to introduce the consumer rebalance method
> > through
> > > > > admin
> > > > > > >> client API.
> > > > > > >>
> > > > > > >> 8) It is not very clear how the newly added AdminClient API
> > > trigger
> > > > > > >> rebalance. For example, does it send request? Can this be
> > > explained
> > > > in
> > > > > > the
> > > > > > >> KIP?
> > > > > > >>
> > > > > > >> Sure, I will add more details to the API.
> > > > > > >>
> > > > > > >>
> > > > > > >> Thanks again for the helpful suggestions!
> > > > > > >>
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Boyang
> > > > > > >>
> > > > > > >> ________________________________
> > > > > > >> From: Dong Lin <lindon...@gmail.com>
> > > > > > >> Sent: Saturday, November 24, 2018 2:54 PM
> > > > > > >> To: dev
> > > > > > >> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > rebalances
> > > > by
> > > > > > >> specifying member id
> > > > > > >>
> > > > > > >> Hey Boyang,
> > > > > > >>
> > > > > > >> Thanks for the update! Here are some followup comments:
> > > > > > >>
> > > > > > >> 1) It is not very clear to the user what is the difference
> > between
> > > > > > >> member.name and client.id as both seems to be used to
> identify
> > > the
> > > > > > >> consumer. I am wondering if it would be more intuitive to name
> > it
> > > > > > >> group.member.name (preferred choice since it matches the
> > current
> > > > > > group.id
> > > > > > >> config name) or rebalance.member.name to explicitly show that
> > the
> > > > id
> > > > > is
> > > > > > >> solely used for rebalance.
> > > > > > >>
> > > > > > >> 2) In the interface change section it is said that
> > > > > > >> GroupMaxSessionTimeoutMs
> > > > > > >> will be changed to 30 minutes. It seems to suggest that we
> will
> > > > change
> > > > > > the
> > > > > > >> default value of this config. It does not seem necessary to
> > > increase
> > > > > the
> > > > > > >> time of consumer failure detection when user doesn't use
> static
> > > > > > >> membership.
> > > > > > >> Also, say static membership is enabled, then this default
> config
> > > > > change
> > > > > > >> will cause a partition to be unavailable for consumption for
> 30
> > > > > minutes
> > > > > > if
> > > > > > >> there is hard consumer failure, which seems to be worse
> > experience
> > > > > than
> > > > > > >> having unnecessary rebalance (when this timeout is small),
> > > > > particularly
> > > > > > >> for
> > > > > > >> new users of Kafka. Could you explain more why we should make
> > this
> > > > > > change?
> > > > > > >>
> > > > > > >> 3) Could we just combine MEMBER_ID_MISMATCH and
> > > > > DUPLICATE_STATIC_MEMBER
> > > > > > >> into one error? It seems that these two errors are currently
> > > handled
> > > > > by
> > > > > > >> the
> > > > > > >> consumer in the same way. And we don't also don't expect
> > > > > > >> MEMBER_ID_MISMATCH
> > > > > > >> to happen. Thus it is not clear what is the benefit of having
> > two
> > > > > > errors.
> > > > > > >>
> > > > > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join
> group
> > > > > > contains
> > > > > > >> member name which is already in the consumer group, however
> the
> > > > member
> > > > > > id
> > > > > > >> was missing". After a consumer is restarted, it will send a
> > > > > > >> JoinGroupRequest with an existing memberName (as the
> coordinator
> > > has
> > > > > not
> > > > > > >> expired this member from the memory) and memberId
> > > > > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not
> > > > persisted
> > > > > > >> across consumer restart in the consumer side). Does it mean
> that
> > > > > > >> JoinGroupRequest from a newly restarted consumer will always
> be
> > > > > rejected
> > > > > > >> until the sessionTimeoutMs has passed?
> > > > > > >>
> > > > > > >> 5) It seems that we always add two methods to the interface
> > > > > > >> org.apache.kafka.clients.admin.AdminClient.java, one with
> > options
> > > > and
> > > > > > the
> > > > > > >> other without option. Could this be specified in the interface
> > > > change
> > > > > > >> section?
> > > > > > >>
> > > > > > >> 6) Do we plan to have off-the-shelf command line tool for SRE
> to
> > > > > trigger
> > > > > > >> rebalance? If so, we probably want to specify the command line
> > > tool
> > > > > > >> interface similar to
> > > > > > >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=gWbzMbM%2ByA8%2FGbeC5Eh6kt8FuE5j%2FnrHaKE%2FhipcCBQ%3D&amp;reserved=0
> > > > > > >> .
> > > > > > >>
> > > > > > >> 7) Would it be simpler to replace name "forceStaticRebalance"
> > with
> > > > > > >> "invokeConsumerRebalance"? It is not very clear what is the
> > extra
> > > > > > meaning
> > > > > > >> of world "force" as compared to "trigger" or "invoke". And it
> > > seems
> > > > > > >> simpler
> > > > > > >> to allows this API to trigger rebalance regardless of whether
> > > > consumer
> > > > > > is
> > > > > > >> configured with memberName.
> > > > > > >>
> > > > > > >> 8) It is not very clear how the newly added AdminClient API
> > > trigger
> > > > > > >> rebalance. For example, does it send request? Can this be
> > > explained
> > > > in
> > > > > > the
> > > > > > >> KIP?
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Dong
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Nov 22, 2018 at 6:37 AM Boyang Chen <
> > bche...@outlook.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hey Mayuresh,
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > thanks for your feedbacks! I will try do another checklist
> > here.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > By this you mean, even if the application has not called
> > > > > > >> > > KafkaConsumer.poll() within session timeout, it will not
> be
> > > > > sending
> > > > > > >> the
> > > > > > >> > > LeaveGroup request, right?
> > > > > > >> >
> > > > > > >> > Yep it's true, we will prevent client from sending leave
> group
> > > > > request
> > > > > > >> > when they are set with `member.name`.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > When is the member.name removed from this map?
> > > > > > >> > Good question, we will only kick off member due to session
> > > timeout
> > > > > > >> within
> > > > > > >> > static membership. Let me update the KIP to clearly assert
> > that.
> > > > > > >> >
> > > > > > >> > > How is this case (missing member id) handled on the client
> > > side?
> > > > > > What
> > > > > > >> is
> > > > > > >> > the application that
> > > > > > >> > > is using the KafkaConsumer suppose to do in this scenario?
> > > > > > >> > I have extended the two exceptions within join group
> response
> > > V4.
> > > > > > >> > Basically I define both corresponding actions to be
> immediate
> > > > > failing
> > > > > > >> > client application, because so far it is unknown what kind
> of
> > > > client
> > > > > > >> issue
> > > > > > >> > could trigger them. After the first version, we will keep
> > > enhance
> > > > > the
> > > > > > >> error
> > > > > > >> > handling logic!
> > > > > > >> >
> > > > > > >> > > This would mean that it might take more time to detect
> > unowned
> > > > > topic
> > > > > > >> > > partitions and may cause delay for applications that
> perform
> > > > data
> > > > > > >> > mirroring
> > > > > > >> > > tasks. I discussed this with our sre and we have a
> > suggestion
> > > to
> > > > > > make
> > > > > > >> > here
> > > > > > >> > > as listed below separately.
> > > > > > >> > The goal of extending session timeout cap is for users with
> > good
> > > > > > client
> > > > > > >> > side monitoring tools that could auto-heal the dead
> consumers
> > > very
> > > > > > >> fast. So
> > > > > > >> > it is optional (and personal) to extend session timeout to a
> > > > > > reasonable
> > > > > > >> > number with different client scenarios.
> > > > > > >> >
> > > > > > >> > > you meant remove unjoined members of the group, right ?
> > > > > > >> > Yep, there is a typo. Thanks for catching this!
> > > > > > >> >
> > > > > > >> > > What do you mean by " Internally we would optimize this
> > logic
> > > by
> > > > > > >> having
> > > > > > >> > > rebalance timeout only in charge of stopping prepare
> > rebalance
> > > > > > stage,
> > > > > > >> > > without removing non-responsive members immediately."
> There
> > > > would
> > > > > > not
> > > > > > >> be
> > > > > > >> > a
> > > > > > >> > > full rebalance if the lagging consumer sent a JoinGroup
> > > request
> > > > > > later,
> > > > > > >> > > right ? If yes, can you highlight this in the KIP ?
> > > > > > >> > No, there won't be. We want to limit the rebalance timeout
> > > > > > functionality
> > > > > > >> > to only use as a timer to
> > > > > > >> > end prepare rebalance stage. This way, late joining static
> > > members
> > > > > > will
> > > > > > >> > not trigger further rebalance
> > > > > > >> > as long as they are within session timeout. I added your
> > > highlight
> > > > > to
> > > > > > >> the
> > > > > > >> > KIP!
> > > > > > >> >
> > > > > > >> > > The KIP talks about scale up scenario but its not quite
> > clear
> > > > how
> > > > > we
> > > > > > >> > > handle it. Are we adding a separate "expansion.timeout" or
> > we
> > > > > adding
> > > > > > >> > status
> > > > > > >> > > "learner" ?. Can you shed more light on how this is
> handled
> > in
> > > > the
> > > > > > >> KIP,
> > > > > > >> > if
> > > > > > >> > > its handled?
> > > > > > >> > Updated the KIP: we shall not cover scale up case in 345,
> > > because
> > > > we
> > > > > > >> > believe client side could
> > > > > > >> > better handle this logic.
> > > > > > >> >
> > > > > > >> > > I think Jason had brought this up earlier about having a
> way
> > > to
> > > > > say
> > > > > > >> how
> > > > > > >> > > many members/consumer hosts are you choosing to be in the
> > > > consumer
> > > > > > >> group.
> > > > > > >> > > If we can do this, then in case of mirroring applications
> we
> > > can
> > > > > do
> > > > > > >> this
> > > > > > >> > :
> > > > > > >> > > Lets say we have a mirroring application that consumes
> from
> > > > Kafka
> > > > > > >> cluster
> > > > > > >> > > A and produces to Kafka cluster B.
> > > > > > >> > > Depending on the data and the Kafka cluster configuration,
> > > Kafka
> > > > > > >> service
> > > > > > >> > > providers can set a mirroring group saying that it will
> > take,
> > > > for
> > > > > > >> example
> > > > > > >> > > 300 consumer hosts/members to achieve the desired
> throughput
> > > and
> > > > > > >> latency
> > > > > > >> > > for mirroring and can have additional 10 consumer hosts as
> > > spare
> > > > > in
> > > > > > >> the
> > > > > > >> > > same group.
> > > > > > >> > > So when the first 300 members/consumers to join the group
> > will
> > > > > start
> > > > > > >> > > mirroring the data from Kafka cluster A to Kafka cluster
> B.
> > > > > > >> > > The remaining 10 consumer members can sit idle.
> > > > > > >> > > The moment one of the consumer (for example: consumer
> number
> > > 54)
> > > > > > from
> > > > > > >> the
> > > > > > >> > > first 300 members go out of the group (crossed session
> > > timeout),
> > > > > it
> > > > > > >> (the
> > > > > > >> > > groupCoordinator) can just assign the topicPartitions from
> > the
> > > > > > >> consumer
> > > > > > >> > > member 54 to one of the spare hosts.
> > > > > > >> > > Once the consumer member 54 comes back up, it can start as
> > > > being a
> > > > > > >> part
> > > > > > >> > of
> > > > > > >> > > the spare pool.
> > > > > > >> > > This enables us to have lower session timeouts and low
> > latency
> > > > > > >> mirroring,
> > > > > > >> > > in cases where the service providers are OK with having
> > spare
> > > > > hosts.
> > > > > > >> > > This would mean that we would tolerate n consumer members
> > > > leaving
> > > > > > and
> > > > > > >> > > rejoining the group and still provide low latency as long
> > as n
> > > > <=
> > > > > > >> number
> > > > > > >> > of
> > > > > > >> > > spare consumers.
> > > > > > >> > > If there are no spare host available, we can get back to
> the
> > > > idea
> > > > > as
> > > > > > >> > > described in the KIP.
> > > > > > >> > Great idea! In fact on top of static membership we could
> later
> > > > > > introduce
> > > > > > >> > APIs to set hard-coded
> > > > > > >> > client ids to the group and replace the dead host, or as you
> > > > > proposed
> > > > > > to
> > > > > > >> > define spare host as
> > > > > > >> > what I understood as hot backup. I will put both Jason and
> > your
> > > > > > >> > suggestions into a separate section
> > > > > > >> > called "Future works". Note that this spare host idea may be
> > > also
> > > > > > >> solvable
> > > > > > >> > through rebalance protocol
> > > > > > >> > IMO.
> > > > > > >> >
> > > > > > >> > Thank you again for the great feedback!
> > > > > > >> >
> > > > > > >> > Boyang
> > > > > > >> > ________________________________
> > > > > > >> > From: Boyang Chen <bche...@outlook.com>
> > > > > > >> > Sent: Thursday, November 22, 2018 3:39 PM
> > > > > > >> > To: dev@kafka.apache.org
> > > > > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > > rebalances
> > > > > by
> > > > > > >> > specifying member id
> > > > > > >> >
> > > > > > >> > Hey Dong, sorry for missing your message. I couldn't find
> your
> > > > email
> > > > > > on
> > > > > > >> my
> > > > > > >> > thread, so I will just do a checklist here!
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > 1) The motivation currently explicitly states that the goal
> is
> > > to
> > > > > > >> improve
> > > > > > >> >
> > > > > > >> > performance for heavy state application. It seems that the
> > > > > motivation
> > > > > > >> can
> > > > > > >> >
> > > > > > >> > be stronger with the following use-case. Currently for
> > > MirrorMaker
> > > > > > >> cluster
> > > > > > >> >
> > > > > > >> > with e.g. 100 MirrorMaker processes, it will take a long
> time
> > to
> > > > > > rolling
> > > > > > >> >
> > > > > > >> > bounce the entire MirrorMaker cluster. Each MirrorMaker
> > process
> > > > > > restart
> > > > > > >> >
> > > > > > >> > will trigger a rebalance which currently pause the
> consumption
> > > of
> > > > > the
> > > > > > >> all
> > > > > > >> >
> > > > > > >> > partitions of the MirrorMaker cluster. With the change
> stated
> > in
> > > > > this
> > > > > > >> >
> > > > > > >> > patch, as long as a MirrorMaker can restart within the
> > specified
> > > > > > timeout
> > > > > > >> >
> > > > > > >> > (e.g. 2 minutes), then we only need constant number of
> > rebalance
> > > > > (e.g.
> > > > > > >> for
> > > > > > >> >
> > > > > > >> > leader restart) for the entire rolling bounce, which will
> > > > > > significantly
> > > > > > >> >
> > > > > > >> > improves the availability of the MirrorMaker pipeline. In my
> > > > > opinion,
> > > > > > >> the
> > > > > > >> >
> > > > > > >> > main benefit of the KIP is to avoid unnecessary rebalance if
> > the
> > > > > > >> consumer
> > > > > > >> >
> > > > > > >> > process can be restarted within soon, which helps
> performance
> > > even
> > > > > if
> > > > > > >> >
> > > > > > >> > overhead of state shuffling for a given process is small.
> > > > > > >> >
> > > > > > >> > I just rephrased this part and added it to the KIP. Thanks
> for
> > > > > making
> > > > > > >> the
> > > > > > >> > motivation more solid!
> > > > > > >> >
> > > > > > >> > 2) In order to simplify the KIP reading, can you follow the
> > > > writeup
> > > > > > >> style
> > > > > > >> > of other KIP (e.g. KIP-98) and list the interface change
> such
> > as
> > > > new
> > > > > > >> > configs (e.g. registration timeout), new request/response,
> new
> > > > > > >> AdminClient
> > > > > > >> > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)?
> > Currently
> > > > > some
> > > > > > of
> > > > > > >> > these are specified in the Proposed Change section which
> makes
> > > it
> > > > a
> > > > > > bit
> > > > > > >> > inconvenient to understand the new interface that will be
> > > exposed
> > > > to
> > > > > > >> user.
> > > > > > >> > Explanation of the current two-phase rebalance protocol
> > probably
> > > > can
> > > > > > be
> > > > > > >> > moved out of public interface section.
> > > > > > >> > This is a great suggestion! I just consolidated all the
> public
> > > API
> > > > > > >> > changes, and the whole KIP
> > > > > > >> > looks much more organized!
> > > > > > >> >
> > > > > > >> > 3) There are currently two version of JoinGroupRequest in
> the
> > > KIP
> > > > > and
> > > > > > >> only
> > > > > > >> > one of them has field memberId. This seems confusing.
> > > > > > >> > Yep, I already found this issue and fixed it.
> > > > > > >> >
> > > > > > >> > 4) It is mentioned in the KIP that "An admin API to force
> > > > rebalance
> > > > > > >> could
> > > > > > >> > be helpful here, but we will make a call once we finished
> the
> > > > major
> > > > > > >> > implementation". So this seems to be still an open question
> in
> > > the
> > > > > > >> current
> > > > > > >> > design. We probably want to agree on this before voting for
> > the
> > > > KIP.
> > > > > > >> > We have finalized the idea that this API is needed.
> > > > > > >> >
> > > > > > >> > 5) The KIP currently adds new config MEMBER_NAME for
> consumer.
> > > Can
> > > > > you
> > > > > > >> > specify the name of the config key and the default config
> > value?
> > > > > > >> Possible
> > > > > > >> > default values include empty string or null (similar to
> > > > > > transaction.id<
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Ftransaction.id%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3Db2d8sQWM8niJreqST7%252BJLcxfEyBmj7cJp4Lm5cYT57s%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=qWkxpqJMF6ugtVKhupIthbcmSNFmp4sX5EfgvKAiAQo%3D&amp;reserved=0
> > > > > > >> >
> > > > > > >> > in
> > > > > > >> > producer config).
> > > > > > >> > I have defined the `member.name` in "New configuration"
> > > section.
> > > > > > >> >
> > > > > > >> > 6) Regarding the use of the topic "static_member_map" to
> > persist
> > > > > > member
> > > > > > >> > name map, currently if consumer coordinator broker goes
> > offline,
> > > > > > >> rebalance
> > > > > > >> > is triggered and consumers will try connect to the new
> > > > coordinator.
> > > > > If
> > > > > > >> > these consumers can connect to the new coordinator within
> > > > > > >> > max.poll.interval.ms<
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=kRFKKVocKt0U4Vb%2BepPC7xUAZQ4KgUxzJ7%2FxEOqtfwA%3D&amp;reserved=0
> > > > > > >> >
> > > > > > >> > which by default is 5 minutes, given that broker can
> > > > > > >> > use a deterministic algorithm to determine the partition ->
> > > > > > member_name
> > > > > > >> > mapping, each consumer should get assigned the same set of
> > > > > partitions
> > > > > > >> > without requiring state shuffling. So it is not clear
> whether
> > we
> > > > > have
> > > > > > a
> > > > > > >> > strong use-case for this new logic. Can you help clarify
> what
> > is
> > > > the
> > > > > > >> > benefit of using topic "static_member_map" to persist member
> > > name
> > > > > map?
> > > > > > >> > I have discussed with Guozhang offline, and I believe
> reusing
> > > the
> > > > > > >> current
> > > > > > >> > `_consumer_offsets`
> > > > > > >> > topic is a better and unified solution.
> > > > > > >> >
> > > > > > >> > 7) Regarding the introduction of the expensionTimeoutMs
> > config,
> > > it
> > > > > is
> > > > > > >> > mentioned that "we are using expansion timeout to replace
> > > > rebalance
> > > > > > >> > timeout, which is configured by max.poll.intervals from
> client
> > > > side,
> > > > > > and
> > > > > > >> > using registration timeout to replace session timeout".
> > > Currently
> > > > > the
> > > > > > >> > default max.poll.interval.ms<
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=kRFKKVocKt0U4Vb%2BepPC7xUAZQ4KgUxzJ7%2FxEOqtfwA%3D&amp;reserved=0
> > > > > > >> >
> > > > > > >> > is configured to be 5 minutes and there will
> > > > > > >> > be only one rebalance if all new consumers can join within 5
> > > > > minutes.
> > > > > > >> So it
> > > > > > >> > is not clear whether we have a strong use-case for this new
> > > > config.
> > > > > > Can
> > > > > > >> you
> > > > > > >> > explain what is the benefit of introducing this new config?
> > > > > > >> > Previously our goal is to use expansion timeout as a
> > workaround
> > > > for
> > > > > > >> > triggering multiple
> > > > > > >> > rebalances when scaling up members are not joining at the
> same
> > > > time.
> > > > > > It
> > > > > > >> is
> > > > > > >> > decided to
> > > > > > >> > be addressed by client side protocol change, so we will not
> > > > > introduce
> > > > > > >> > expansion timeout.
> > > > > > >> >
> > > > > > >> > 8) It is mentioned that "To distinguish between previous
> > version
> > > > of
> > > > > > >> > protocol, we will also increase the join group request
> version
> > > to
> > > > v4
> > > > > > >> when
> > > > > > >> > MEMBER_NAME is set" and "If the broker version is not the
> > latest
> > > > (<
> > > > > > v4),
> > > > > > >> > the join group request shall be downgraded to v3 without
> > setting
> > > > the
> > > > > > >> member
> > > > > > >> > Id". It is probably simpler to just say that this feature is
> > > > enabled
> > > > > > if
> > > > > > >> > JoinGroupRequest V4 is supported on both client and broker
> and
> > > > > > >> MEMBER_NAME
> > > > > > >> > is configured with non-empty string.
> > > > > > >> > Yep, addressed this!
> > > > > > >> >
> > > > > > >> > 9) It is mentioned that broker may return
> > > > NO_STATIC_MEMBER_INFO_SET
> > > > > > >> error
> > > > > > >> > in OffsetCommitResponse for "commit requests under static
> > > > > membership".
> > > > > > >> Can
> > > > > > >> > you clarify how broker determines whether the commit request
> > is
> > > > > under
> > > > > > >> > static membership?
> > > > > > >> >
> > > > > > >> > We have agreed that commit request shouldn't be affected by
> > the
> > > > new
> > > > > > >> > membership, thus
> > > > > > >> > removing it here. Thanks for catching this!
> > > > > > >> >
> > > > > > >> > Let me know if you have further suggestions or concerns.
> Thank
> > > you
> > > > > for
> > > > > > >> > your valuable feedback
> > > > > > >> > to help me design the KIP better! (And I will try to address
> > > your
> > > > > > >> > feedbacks in next round Mayuresh ??)
> > > > > > >> >
> > > > > > >> > Best,
> > > > > > >> > Boyang
> > > > > > >> > ________________________________
> > > > > > >> > From: Mayuresh Gharat <gharatmayures...@gmail.com>
> > > > > > >> > Sent: Wednesday, November 21, 2018 7:50 AM
> > > > > > >> > To: dev@kafka.apache.org
> > > > > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > > rebalances
> > > > > by
> > > > > > >> > specifying member id
> > > > > > >> >
> > > > > > >> > Hi Boyang,
> > > > > > >> >
> > > > > > >> > Thanks for updating the KIP. This is a step good direction
> for
> > > > > > stateful
> > > > > > >> > applications and also mirroring applications whose latency
> is
> > > > > affected
> > > > > > >> due
> > > > > > >> > to the rebalance issues that we have today.
> > > > > > >> >
> > > > > > >> > I had a few questions on the current version of the KIP :
> > > > > > >> > For the effectiveness of the KIP, consumer with member.name
> > set
> > > > > will
> > > > > > >> *not
> > > > > > >> > send leave group request* when they go offline
> > > > > > >> >
> > > > > > >> > > By this you mean, even if the application has not called
> > > > > > >> > > KafkaConsumer.poll() within session timeout, it will not
> be
> > > > > sending
> > > > > > >> the
> > > > > > >> > > LeaveGroup request, right?
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Broker will maintain an in-memory mapping of {member.name ?
> > > > > member.id
> > > > > > }
> > > > > > >> to
> > > > > > >> > track member uniqueness.
> > > > > > >> >
> > > > > > >> > > When is the member.name removed from this map?
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Member.id must be set if the *member.name <
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=ckYE3gR46UxmbhDCqeZkfqfR%2F3sM60b8eZUFL0n8l%2F4%3D&amp;reserved=0
> > > > > > >> >
> > > > > > >> > *is already
> > > > > > >> > within the map. Otherwise reply MISSING_MEMBER_ID
> > > > > > >> >
> > > > > > >> > > How is this case handled on the client side? What is the
> > > > > application
> > > > > > >> that
> > > > > > >> > > is using the KafkaConsumer suppose to do in this scenario?
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Session timeout is the timeout we will trigger rebalance
> when
> > a
> > > > > member
> > > > > > >> goes
> > > > > > >> > offline for too long (not sending heartbeat request). To
> make
> > > > static
> > > > > > >> > membership effective, we should increase the default max
> > session
> > > > > > >> timeout to
> > > > > > >> > 30 min so that end user could config it freely.
> > > > > > >> >
> > > > > > >> > > This would mean that it might take more time to detect
> > unowned
> > > > > topic
> > > > > > >> > > partitions and may cause delay for applications that
> perform
> > > > data
> > > > > > >> > mirroring
> > > > > > >> > > tasks. I discussed this with our sre and we have a
> > suggestion
> > > to
> > > > > > make
> > > > > > >> > here
> > > > > > >> > > as listed below separately.
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Currently there is a config called *rebalance timeout* which
> > is
> > > > > > >> configured
> > > > > > >> > by consumer *max.poll.intervals*. The reason we set it to
> poll
> > > > > > interval
> > > > > > >> is
> > > > > > >> > because consumer could only send request within the call of
> > > poll()
> > > > > and
> > > > > > >> we
> > > > > > >> > want to wait sufficient time for the join group request.
> When
> > > > > reaching
> > > > > > >> > rebalance timeout, the group will move towards
> > > completingRebalance
> > > > > > stage
> > > > > > >> > and remove unjoined groups
> > > > > > >> >
> > > > > > >> > > you meant remove unjoined members of the group, right ?
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Currently there is a config called *rebalance timeout* which
> > is
> > > > > > >> configured
> > > > > > >> > by consumer *max.poll.intervals*. The reason we set it to
> poll
> > > > > > interval
> > > > > > >> is
> > > > > > >> > because consumer could only send request within the call of
> > > poll()
> > > > > and
> > > > > > >> we
> > > > > > >> > want to wait sufficient time for the join group request.
> When
> > > > > reaching
> > > > > > >> > rebalance timeout, the group will move towards
> > > completingRebalance
> > > > > > stage
> > > > > > >> > and remove unjoined groups. This is actually conflicting
> with
> > > the
> > > > > > >> design of
> > > > > > >> > static membership, because those temporarily unavailable
> > members
> > > > > will
> > > > > > >> > potentially reattempt the join group and trigger extra
> > > rebalances.
> > > > > > >> > Internally we would optimize this logic by having rebalance
> > > > timeout
> > > > > > >> only in
> > > > > > >> > charge of stopping prepare rebalance stage, without removing
> > > > > > >> non-responsive
> > > > > > >> > members immediately.
> > > > > > >> >
> > > > > > >> > > What do you mean by " Internally we would optimize this
> > logic
> > > by
> > > > > > >> having
> > > > > > >> > > rebalance timeout only in charge of stopping prepare
> > rebalance
> > > > > > stage,
> > > > > > >> > > without removing non-responsive members immediately."
> There
> > > > would
> > > > > > not
> > > > > > >> be
> > > > > > >> > a
> > > > > > >> > > full rebalance if the lagging consumer sent a JoinGroup
> > > request
> > > > > > later,
> > > > > > >> > > right ? If yes, can you highlight this in the KIP ?
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Scale Up
> > > > > > >> >
> > > > > > >> > > The KIP talks about scale up scenario but its not quite
> > clear
> > > > how
> > > > > we
> > > > > > >> > > handle it. Are we adding a separate "expansion.timeout" or
> > we
> > > > > adding
> > > > > > >> > status
> > > > > > >> > > "learner" ?. Can you shed more light on how this is
> handled
> > in
> > > > the
> > > > > > >> KIP,
> > > > > > >> > if
> > > > > > >> > > its handled?
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > *Discussion*
> > > > > > >> > Larger session timeouts causing latency rise for getting
> data
> > > for
> > > > > > >> un-owned
> > > > > > >> > topic partitions :
> > > > > > >> >
> > > > > > >> > > I think Jason had brought this up earlier about having a
> way
> > > to
> > > > > say
> > > > > > >> how
> > > > > > >> > > many members/consumer hosts are you choosing to be in the
> > > > consumer
> > > > > > >> group.
> > > > > > >> > > If we can do this, then in case of mirroring applications
> we
> > > can
> > > > > do
> > > > > > >> this
> > > > > > >> > :
> > > > > > >> > > Lets say we have a mirroring application that consumes
> from
> > > > Kafka
> > > > > > >> cluster
> > > > > > >> > > A and produces to Kafka cluster B.
> > > > > > >> > > Depending on the data and the Kafka cluster configuration,
> > > Kafka
> > > > > > >> service
> > > > > > >> > > providers can set a mirroring group saying that it will
> > take,
> > > > for
> > > > > > >> example
> > > > > > >> > > 300 consumer hosts/members to achieve the desired
> throughput
> > > and
> > > > > > >> latency
> > > > > > >> > > for mirroring and can have additional 10 consumer hosts as
> > > spare
> > > > > in
> > > > > > >> the
> > > > > > >> > > same group.
> > > > > > >> > > So when the first 300 members/consumers to join the group
> > will
> > > > > start
> > > > > > >> > > mirroring the data from Kafka cluster A to Kafka cluster
> B.
> > > > > > >> > > The remaining 10 consumer members can sit idle.
> > > > > > >> > > The moment one of the consumer (for example: consumer
> number
> > > 54)
> > > > > > from
> > > > > > >> the
> > > > > > >> > > first 300 members go out of the group (crossed session
> > > timeout),
> > > > > it
> > > > > > >> (the
> > > > > > >> > > groupCoordinator) can just assign the topicPartitions from
> > the
> > > > > > >> consumer
> > > > > > >> > > member 54 to one of the spare hosts.
> > > > > > >> > > Once the consumer member 54 comes back up, it can start as
> > > > being a
> > > > > > >> part
> > > > > > >> > of
> > > > > > >> > > the spare pool.
> > > > > > >> > > This enables us to have lower session timeouts and low
> > latency
> > > > > > >> mirroring,
> > > > > > >> > > in cases where the service providers are OK with having
> > spare
> > > > > hosts.
> > > > > > >> > > This would mean that we would tolerate n consumer members
> > > > leaving
> > > > > > and
> > > > > > >> > > rejoining the group and still provide low latency as long
> > as n
> > > > <=
> > > > > > >> number
> > > > > > >> > of
> > > > > > >> > > spare consumers.
> > > > > > >> > > If there are no spare host available, we can get back to
> the
> > > > idea
> > > > > as
> > > > > > >> > > described in the KIP.
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> >
> > > > > > >> > Mayuresh
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Tue, Nov 20, 2018 at 10:18 AM Konstantine Karantasis <
> > > > > > >> > konstant...@confluent.io> wrote:
> > > > > > >> >
> > > > > > >> > > Hi Boyang.
> > > > > > >> > >
> > > > > > >> > > Thanks for preparing this KIP! It is making good progress
> > and
> > > > will
> > > > > > be
> > > > > > >> a
> > > > > > >> > > great improvement for stateful Kafka applications.
> > > > > > >> > >
> > > > > > >> > > Apologies for my late reply, I was away for a while. Lots
> of
> > > > great
> > > > > > >> > comments
> > > > > > >> > > so far, so I'll probably second most of them in what I
> > suggest
> > > > > below
> > > > > > >> at
> > > > > > >> > > this point.
> > > > > > >> > >
> > > > > > >> > > When I first read the KIP, I wanted to start at the end
> with
> > > > > > something
> > > > > > >> > that
> > > > > > >> > > wasn't highlighted a lot. That was the topic related to
> > > handling
> > > > > > >> > duplicate
> > > > > > >> > > members. I see now that the initial suggestion of handling
> > > this
> > > > > > >> situation
> > > > > > >> > > during offset commit has been removed, and I agree with
> > that.
> > > > > Issues
> > > > > > >> > > related to membership seem to be handled better when the
> > > member
> > > > > > joins
> > > > > > >> the
> > > > > > >> > > group rather than when it tries to commit offsets. This
> also
> > > > > > >> simplifies
> > > > > > >> > how
> > > > > > >> > > many request types need to change in order to incorporate
> > the
> > > > new
> > > > > > >> member
> > > > > > >> > > name field.
> > > > > > >> > >
> > > > > > >> > > I also agree with what Jason and Guozhang have said
> > regarding
> > > > > > >> timeouts.
> > > > > > >> > > Although semantically, it's easier to think of every
> > operation
> > > > > > having
> > > > > > >> its
> > > > > > >> > > own timeout, operationally this can become a burden. Thus,
> > > > > > >> consolidation
> > > > > > >> > > seems preferable here. The definition of embedded
> protocols
> > on
> > > > top
> > > > > > of
> > > > > > >> the
> > > > > > >> > > base group membership protocol for rebalancing gives
> enough
> > > > > > >> flexibility
> > > > > > >> > to
> > > > > > >> > > address such needs in each client component separately.
> > > > > > >> > >
> > > > > > >> > > Finally, some minor comments:
> > > > > > >> > > In a few places the new/proposed changes are referred to
> as
> > > > > > "current".
> > > > > > >> > > Which is a bit confusing considering that there is a
> > protocol
> > > in
> > > > > > place
> > > > > > >> > > already, and by "current" someone might understand the
> > > existing
> > > > > one.
> > > > > > >> I'd
> > > > > > >> > > recommend using new/proposed or equivalent when referring
> to
> > > > > changes
> > > > > > >> > > introduced with KIP-345 and current/existing or equivalent
> > > when
> > > > > > >> referring
> > > > > > >> > > to existing behavior.
> > > > > > >> > >
> > > > > > >> > > There's the following sentence in the "Public Interfaces"
> > > > section:
> > > > > > >> > > "Since for many stateful consumer/stream applications, the
> > > state
> > > > > > >> > shuffling
> > > > > > >> > > is more painful than short time partial unavailability."
> > > > > > >> > > However, my understanding is that the changes proposed
> with
> > > > > KIP-345
> > > > > > >> will
> > > > > > >> > > not exploit any partial availability. A suggestion for
> > dealing
> > > > > with
> > > > > > >> > > temporary imbalances has been made in "Incremental
> > Cooperative
> > > > > > >> > Rebalancing"
> > > > > > >> > > which can work well with KIP-345, but here I don't see
> > > proposed
> > > > > > >> changes
> > > > > > >> > > that suggest that some resources (e.g. partitions) will
> keep
> > > > being
> > > > > > >> used
> > > > > > >> > > while others will not be utilized. Thus, you might want to
> > > > adjust
> > > > > > this
> > > > > > >> > > sentence. Correct me if I'm missing something related to
> > that.
> > > > > > >> > >
> > > > > > >> > > In the rejected alternatives, under point 2) I read "we
> can
> > > copy
> > > > > the
> > > > > > >> > member
> > > > > > >> > > id to the config files". I believe it means to say "member
> > > name"
> > > > > > >> unless
> > > > > > >> > I'm
> > > > > > >> > > missing something about reusing member ids. Also below I
> > read:
> > > > "By
> > > > > > >> > allowing
> > > > > > >> > > consumers to optionally specifying a member id" which
> > probably
> > > > > > implies
> > > > > > >> > > "member name" again. In a sense this section highlights a
> > > > > potential
> > > > > > >> > > confusion between member name and member id. I wonder if
> we
> > > > could
> > > > > > >> come up
> > > > > > >> > > with a better term for the new field. StaticTag,
> > StaticLabel,
> > > or
> > > > > > even
> > > > > > >> > > StaticName are some suggestions that could potentially
> help
> > > with
> > > > > > >> > confusion
> > > > > > >> > > between MemberId and MemberName and what corresponds to
> > what.
> > > > But
> > > > > I
> > > > > > >> > > wouldn't like to disrupt the discussion with naming
> > > conventions
> > > > > too
> > > > > > >> much
> > > > > > >> > at
> > > > > > >> > > this point. I just mention it here as a thought.
> > > > > > >> > >
> > > > > > >> > > Looking forward to see the final details of this KIP.
> Great
> > > work
> > > > > so
> > > > > > >> far!
> > > > > > >> > >
> > > > > > >> > > Konstantine
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen <
> > > > bche...@outlook.com>
> > > > > > >> wrote:
> > > > > > >> > >
> > > > > > >> > > > Thanks Guozhang for the great summary here, and I have
> > been
> > > > > > >> following
> > > > > > >> > up
> > > > > > >> > > > the action items here.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >   1.  I already updated the KIP to remove the expansion
> > > > timeout
> > > > > > and
> > > > > > >> > > > registration timeout. Great to see them being addressed
> in
> > > > > client
> > > > > > >> side!
> > > > > > >> > > >   2.  I double checked the design and I believe that it
> is
> > > ok
> > > > to
> > > > > > >> have
> > > > > > >> > > both
> > > > > > >> > > > static member and dynamic member co-exist in the same
> > group.
> > > > So
> > > > > > the
> > > > > > >> > > upgrade
> > > > > > >> > > > shouldn't be destructive and we are removing the two
> > > > membership
> > > > > > >> > protocol
> > > > > > >> > > > switching APIs.
> > > > > > >> > > >   3.  I only have question about this one. I'm still
> > reading
> > > > the
> > > > > > >> > > KafkaApis
> > > > > > >> > > > code here. Should I just use the same authorization
> logic
> > > for
> > > > > > >> > > > ForceStaticRebalanceRequest as JoinGroupRequest?
> > > > > > >> > > >   4.  I'm very excited to see this work with K8! Like
> you
> > > > > > suggested,
> > > > > > >> > this
> > > > > > >> > > > feature could be better addressed in a separate KIP
> > because
> > > it
> > > > > is
> > > > > > >> > pretty
> > > > > > >> > > > independent. I could start drafting the KIP once the
> > current
> > > > > > >> proposal
> > > > > > >> > is
> > > > > > >> > > > approved.
> > > > > > >> > > >   5.  I believe that we don't need fencing in offset
> > commit
> > > > > > request,
> > > > > > >> > > since
> > > > > > >> > > > duplicate member.name issue could be handled by join
> > group
> > > > > > >> request. We
> > > > > > >> > > > shall reject join group with known member name but no
> > member
> > > > id
> > > > > > >> (which
> > > > > > >> > > > means we already have an active member using this
> > identity).
> > > > > > >> > > >   6.  I agree to remove that internal config once we
> move
> > > > > forward
> > > > > > >> with
> > > > > > >> > > > static membership. And I already removed the entire
> > section
> > > > from
> > > > > > the
> > > > > > >> > KIP.
> > > > > > >> > > >
> > > > > > >> > > > Let me know if you have other concerns.
> > > > > > >> > > >
> > > > > > >> > > > Best,
> > > > > > >> > > > Boyang
> > > > > > >> > > > ________________________________
> > > > > > >> > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > > > >> > > > Sent: Tuesday, November 20, 2018 4:21 PM
> > > > > > >> > > > To: dev
> > > > > > >> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > > > > rebalances
> > > > > > >> by
> > > > > > >> > > > specifying member id
> > > > > > >> > > >
> > > > > > >> > > > Hello Boyang,
> > > > > > >> > > >
> > > > > > >> > > > Thanks a lot for the KIP! It is a great write-up and I
> > > > > appreciate
> > > > > > >> your
> > > > > > >> > > > patience answering to the feedbacks from the community.
> > I'd
> > > > like
> > > > > > to
> > > > > > >> add
> > > > > > >> > > my
> > > > > > >> > > > 2cents here:
> > > > > > >> > > >
> > > > > > >> > > > 1. By introducing another two timeout configs,
> > > > > > registration_timeout
> > > > > > >> and
> > > > > > >> > > > expansion_timeout, we are effectively having four
> timeout
> > > > > configs:
> > > > > > >> > > session
> > > > > > >> > > > timeout, rebalance timeout (configured as "
> > > > max.poll.interval.ms
> > > > > "
> > > > > > on
> > > > > > >> > > client
> > > > > > >> > > > side), and these two. Interplaying these timeout configs
> > can
> > > > be
> > > > > > >> quite
> > > > > > >> > > hard
> > > > > > >> > > > for users with such complexity, and hence I'm wondering
> if
> > > we
> > > > > can
> > > > > > >> > > simplify
> > > > > > >> > > > the situation with as less possible timeout configs as
> > > > possible.
> > > > > > >> Here
> > > > > > >> > is
> > > > > > >> > > a
> > > > > > >> > > > concrete suggestion I'd like propose:
> > > > > > >> > > >
> > > > > > >> > > > 1.a) Instead of introducing a registration_timeout in
> > > addition
> > > > > to
> > > > > > >> the
> > > > > > >> > > > session_timeout for static members, we can just reuse
> the
> > > > > > >> > session_timeout
> > > > > > >> > > > and ask users to set it to a larger value when they are
> > > > > upgrading
> > > > > > a
> > > > > > >> > > dynamic
> > > > > > >> > > > client to a static client by setting the "member.name"
> at
> > > the
> > > > > > same
> > > > > > >> > time.
> > > > > > >> > > > By
> > > > > > >> > > > default, the broker-side min.session.timeout is 6
> seconds
> > > and
> > > > > > >> > > > max.session.timeout is 5 minutes, which seems reasonable
> > to
> > > me
> > > > > (we
> > > > > > >> can
> > > > > > >> > of
> > > > > > >> > > > course modify this broker config to enlarge the valid
> > > interval
> > > > > if
> > > > > > we
> > > > > > >> > want
> > > > > > >> > > > in practice). And then we should also consider removing
> > the
> > > > > > >> condition
> > > > > > >> > for
> > > > > > >> > > > marking a client as failed if the rebalance timeout has
> > > > reached
> > > > > > >> while
> > > > > > >> > the
> > > > > > >> > > > JoinGroup was not received, so that the semantics of
> > > > > > session_timeout
> > > > > > >> > and
> > > > > > >> > > > rebalance_timeout are totally separated: the former is
> > only
> > > > used
> > > > > > to
> > > > > > >> > > > determine if a consumer member of the group should be
> > marked
> > > > as
> > > > > > >> failed
> > > > > > >> > > and
> > > > > > >> > > > kicked out of the group, and the latter is only used to
> > > > > determine
> > > > > > >> the
> > > > > > >> > > > longest time coordinator should wait for
> PREPARE_REBALANCE
> > > > > phase.
> > > > > > In
> > > > > > >> > > other
> > > > > > >> > > > words if a member did not send the JoinGroup in time of
> > the
> > > > > > >> > > > rebalance_timeout, we still include it in the new
> > generation
> > > > of
> > > > > > the
> > > > > > >> > group
> > > > > > >> > > > and use its old subscription info to send to leader for
> > > > > > assignment.
> > > > > > >> > Later
> > > > > > >> > > > if the member came back with HeartBeat request, we can
> > still
> > > > > > follow
> > > > > > >> the
> > > > > > >> > > > normal path to bring it to the latest generation while
> > > > checking
> > > > > > that
> > > > > > >> > its
> > > > > > >> > > > sent JoinGroup request contains the same subscription
> info
> > > as
> > > > we
> > > > > > >> used
> > > > > > >> > to
> > > > > > >> > > > assign the partitions previously (which should be likely
> > the
> > > > > case
> > > > > > in
> > > > > > >> > > > practice). In addition, we should let static members to
> > not
> > > > send
> > > > > > the
> > > > > > >> > > > LeaveGroup request when it is gracefully shutdown, so
> > that a
> > > > > > static
> > > > > > >> > > member
> > > > > > >> > > > can only be leaving the group if its session has timed
> > out,
> > > OR
> > > > > it
> > > > > > >> has
> > > > > > >> > > been
> > > > > > >> > > > indicated to not exist in the group any more (details
> > > below).
> > > > > > >> > > >
> > > > > > >> > > > 1.b) We have a parallel discussion about Incremental
> > > > Cooperative
> > > > > > >> > > > Rebalancing, in which we will encode the "when to
> > rebalance"
> > > > > logic
> > > > > > >> at
> > > > > > >> > the
> > > > > > >> > > > application level, instead of at the protocol level. By
> > > doing
> > > > > this
> > > > > > >> we
> > > > > > >> > can
> > > > > > >> > > > also enable a few other optimizations, e.g. at the
> Streams
> > > > level
> > > > > > to
> > > > > > >> > first
> > > > > > >> > > > build up the state store as standby tasks and then
> > trigger a
> > > > > > second
> > > > > > >> > > > rebalance to actually migrate the active tasks while
> > keeping
> > > > the
> > > > > > >> actual
> > > > > > >> > > > rebalance latency and hence unavailability window to be
> > > small
> > > > (
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=N8BTmhWAmDGJc9%2BQl6ulM9Qa5vzxIyXaGzCDILSIehs%3D&amp;reserved=0
> > > > > > >> > > ).
> > > > > > >> > > > I'd propose we align
> > > > > > >> > > > KIP-345 along with this idea, and hence do not add the
> > > > > > >> > expansion_timeout
> > > > > > >> > > as
> > > > > > >> > > > part of the protocol layer, but only do that at the
> > > > > application's
> > > > > > >> > > > coordinator / assignor layer (Connect, Streams, etc). We
> > can
> > > > > > still,
> > > > > > >> > > > deprecate the "*group.initial.rebalance.delay.ms
> > > > > > >> > > > <
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://eur03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&amp;data=02%7C01%7C%7Cbe90c4b8c80c424d34b808d655cddd4d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636790740011984091&amp;sdata=VO0%2F5TczxUBuJkK7NurBwa1X0wIXwm0WHx4jrCzY0%2Fo%3D&amp;reserved=0
> > > > > > >> > > >*"
> > > > > > >> > > > though as part of this KIP
> > > > > > >> > > > since we have discussed about its limit and think it is
> > > > actually
> > > > > > >> not a
> > > > > > >> > > very
> > > > > > >> > > > good design and could be replaced with client-side logic
> > > > above.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > 2. I'd like to see your thoughts on the upgrade path for
> > > this
> > > > > KIP.
> > > > > > >> More
> > > > > > >> > > > specifically, let's say after we have upgraded broker
> > > version
> > > > to
> > > > > > be
> > > > > > >> > able
> > > > > > >> > > to
> > > > > > >> > > > recognize the new versions of JoinGroup request and the
> > > admin
> > > > > > >> requests,
> > > > > > >> > > how
> > > > > > >> > > > should we upgrade the clients and enable static groups?
> On
> > > top
> > > > > of
> > > > > > my
> > > > > > >> > head
> > > > > > >> > > > if we do a rolling bounce in which we set the
> member.name
> > > > > config
> > > > > > as
> > > > > > >> > well
> > > > > > >> > > > as
> > > > > > >> > > > optionally increase the session.timeout config when we
> > > bounce
> > > > > each
> > > > > > >> > > > instance, then during this rolling bounces we will have
> a
> > > > group
> > > > > > >> > contained
> > > > > > >> > > > with both dynamic members and static members. It means
> > that
> > > we
> > > > > > >> should
> > > > > > >> > > have
> > > > > > >> > > > the group to allow such scenario (i.e. we cannot reject
> > > > > JoinGroup
> > > > > > >> > > requests
> > > > > > >> > > > from dynamic members), and hence the "member.name" -> "
> > > > > member.id"
> > > > > > >> > > mapping
> > > > > > >> > > > will only be partial at this scenario. Also could you
> > > describe
> > > > > if
> > > > > > >> the
> > > > > > >> > > > upgrade to the first version that support this feature
> > would
> > > > > ever
> > > > > > >> get
> > > > > > >> > any
> > > > > > >> > > > benefits, or only the future upgrade path for rolling
> > > bounces
> > > > > > could
> > > > > > >> get
> > > > > > >> > > > benefits out of this feature?
> > > > > > >> > > >
> > > > > > >> > > > If that's the case and we will do 1) as suggested above,
> > do
> > > we
> > > > > > still
> > > > > > >> > need
> > > > > > >> > > > the enableStaticMembership and enableDynamicMembership
> > admin
> > > > > > >> requests
> > > > > > >> > any
> > > > > > >> > > > more? Seems it is not necessary any more as we will only
> > > have
> > > > > the
> > > > > > >> > notion
> > > > > > >> > > of
> > > > > > >> > > > "dynamic or static members" that can co-exist in a group
> > > while
> > > > > > >> there no
> > > > > > >> > > > notion of "dynamic or static groups", and hence these
> two
> > > > > requests
> > > > > > >> are
> > > > > > >> > > not
> > > > > > >> > > > needed anymore.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > 3. We need to briefly talk about the implications for
> ACL
> > as
> > > > we
> > > > > > >> > introduce
> > > > > > >> > > > new admin requests that are related to a specific
> > group.id.
> > > > For
> > > > > > >> > example,
> > > > > > >> > > > we
> > > > > > >> > > > need to make sure that whoever created the group or
> joined
> > > the
> > > > > > group
> > > > > > >> > can
> > > > > > >> > > > actually send admin requests for the group, otherwise
> the
> > > > > > >> application
> > > > > > >> > > > owners need to bother the Kafka operators on a
> > multi-tenant
> > > > > > cluster
> > > > > > >> > every
> > > > > > >> > > > time they want to send any admin requests for their
> groups
> > > > which
> > > > > > >> would
> > > > > > >> > be
> > > > > > >> > > > an operational nightmare.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > 4. I like Jason's suggestion of adding an optional field
> > for
> > > > the
> > > > > > >> list
> > > > > > >> > of
> > > > > > >> > > > member names, and I'm wondering if that can be done as
> > part
> > > of
> > > > > the
> > > > > > >> > > > forceStaticRebalance request: i.e. by passing a list of
> > > > members,
> > > > > > we
> > > > > > >> > will
> > > > > > >> > > > enforce a rebalance immediately since it indicates that
> > some
> > > > > > static
> > > > > > >> > > member
> > > > > > >> > > > will be officially kicked out of the group and some new
> > > static
> > > > > > >> members
> > > > > > >> > > may
> > > > > > >> > > > be added. So back to 1.a) above, a static member can
> only
> > be
> > > > > > kicked
> > > > > > >> out
> > > > > > >> > > of
> > > > > > >> > > > the group if a) its session (arguably long period of
> time)
> > > has
> > > > > > timed
> > > > > > >> > out,
> > > > > > >> > > > and b) this admin request explicitly state that it is no
> > > > longer
> > > > > > >> part of
> > > > > > >> > > the
> > > > > > >> > > > group. As for execution I'm fine with keeping it as a
> > future
> > > > > work
> > > > > > of
> > > > > > >> > this
> > > > > > >> > > > KIP if you'd like to make its scope smaller.
> > > > > > >> > > >
> > > > > > >> > > > Following are minor comments:
> > > > > > >> > > >
> > > > > > >> > > > 5. I'm not sure if we need to include "member.name" as
> > part
> > > > of
> > > > > > the
> > > > > > >> > > > OffsetCommitRequest for fencing purposes, as I think the
> > > > > memberId
> > > > > > >> plus
> > > > > > >> > > the
> > > > > > >> > > > generation number should be sufficient for fencing even
> > with
> > > > > > static
> > > > > > >> > > > members.
> > > > > > >> > > >
> > > > > > >> > > > 6. As mentioned above, if we agree to do 1) we can get
> rid
> > > of
> > > > > the
> > > > > > "
> > > > > > >> > > > LEAVE_GROUP_ON_CLOSE_CONFIG" config.
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > Guozhang
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Sat, Nov 17, 2018 at 5:53 PM Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hey Boyang,
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks for the proposal! This is very useful. I have
> > some
> > > > > > comments
> > > > > > >> > > below:
> > > > > > >> > > > >
> > > > > > >> > > > > 1) The motivation currently explicitly states that the
> > > goal
> > > > is
> > > > > > to
> > > > > > >> > > improve
> > > > > > >> > > > > performance for heavy state application. It seems that
> > the
> > > > > > >> motivation
> > > > > > >> > > can
> > > > > > >> > > > > be stronger with the following use-case. Currently for
> > > > > > MirrorMaker
> > > > > > >> > > > cluster
> > > > > > >> > > > > with e.g. 100 MirrorMaker processes, it will take a
> long
> > > > time
> > > > > to
> > > > > > >> > > rolling
> > > > > > >> > > > > bounce the entire MirrorMaker cluster. Each
> MirrorMaker
> > > > > process
> > > > > > >> > restart
> > > > > > >> > > > > will trigger a rebalance which currently pause the
> > > > consumption
> > > > > > of
> > > > > > >> the
> > > > > > >> > > all
> > > > > > >> > > > > partitions of the MirrorMaker cluster. With the change
> > > > stated
> > > > > in
> > > > > > >> this
> > > > > > >> > > > > patch, as long as a MirrorMaker can restart within the
> > > > > specified
> > > > > > >> > > timeout
> > > > > > >> > > > > (e.g. 2 minutes), then we only need constant number of
> > > > > rebalance
> > > > > > >> > (e.g.
> > > > > > >> > > > for
> > > > > > >> > > > > leader restart) for the entire rolling bounce, which
> > will
> > > > > > >> > significantly
> > > > > > >> > > > > improves the availability of the MirrorMaker pipeline.
> > In
> > > my
> > > > > > >> opinion,
> > > > > > >> > > the
> > > > > > >> > > > > main benefit of the KIP is to avoid unnecessary
> > rebalance
> > > if
> > > > > the
> > > > > > >> > > consumer
> > > > > > >> > > > > process can be restarted within soon, which helps
> > > > performance
> > > > > > >> even if
> > > > > > >> > > > > overhead of state shuffling for a given process is
> > small.
> > > > > > >> > > > >
> > > > > > >> > > > > 2) In order to simplify the KIP reading, can you
> follow
> > > the
> > > > > > >> writeup
> > > > > > >> > > style
> > > > > > >> > > > > of other KIP (e.g. KIP-98) and list the interface
> change
> > > > such
> > > > > as
> > > > > > >> new
> > > > > > >> > > > > configs (e.g. registration timeout), new
> > request/response,
> > > > new
> > > > > > >> > > > AdminClient
> > > > > > >> > > > > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)?
> > > > > Currently
> > > > > > >> some
> > > > > > >> > > of
> > > > > > >> > > > > these are specified in the Proposed Change section
> which
> > > > makes
> > > > > > it
> > > > > > >> a
> > > > > > >> > bit
> > > > > > >> > > > > inconvenient to understand the new interface that will
> > be
> > > > > > exposed
> > > > > > >> to
> > > > > > >> > > > user.
> > > > > > >> > > > > Explanation of the current two-phase rebalance
> protocol
> > > > > probably
> > > > > > >> can
> > > > > > >> > be
> > > > > > >> > > > > moved out of public interface section.
> > > > > > >> > > > >
> > > > > > >> > > > > 3) There are currently two version of JoinGroupRequest
> > in
> > > > the
> > > > > > KIP
> > > > > > >> and
> > > > > > >> > > > only
> > > > > > >> > > > > one of them has field memberId. This seems confusing.
> > > > > > >> > > > >
> > > > > > >> > > > > 4) It is mentioned in the KIP that "An admin API to
> > force
> > > > > > >> rebalance
> > > > > > >> > > could
> > > > > > >> > > > > be helpful here, but we will make a call once we
> > finished
> > > > the
> > > > > > >> major
> > > > > > >> > > > > implementation". So this seems to be still an open
> > > question
> > > > in
> > > > > > the
> > > > > > >> > > > current
> > > > > > >> > > > > design. We probably want to agree on this before
> voting
> > > for
> > > > > the
> > > > > > >> KIP.
> > > > > > >> > > > >
> > > > > > >> > > > > 5) The KIP currently adds new config MEMBER_NAME for
> > > > consumer.
> > > > > > Can
> > > > > > >> > you
> > > > > > >> > > > > specify the name of the config key and the default
> > config
> > > > > value?
> > > > > > >> > > Possible
> > > > > > >> > > > > default values include empty string or null (similar
> to
> > > > > > >> > transaction.id
> > > > > > >> > > > in
> > > > > > >> > > > > producer config).
> > > > > > >> > > > >
> > > > > > >> > > > > 6) Regarding the use of the topic "static_member_map"
> to
> > > > > persist
> > > > > > >> > member
> > > > > > >> > > > > name map, currently if consumer coordinator broker
> goes
> > > > > offline,
> > > > > > >> > > > rebalance
> > > > > > >> > > > > is triggered and consumers will try connect to the new
> > > > > > >> coordinator.
> > > > > > >> > If
> > > > > > >> > > > > these consumers can connect to the new coordinator
> > within
> > > > > > >> > > > > max.poll.interval.ms which by default is 5 minutes,
> > given
> > > > > that
> > > > > > >> > broker
> > > > > > >> > > > can
> > > > > > >> > > > > use a deterministic algorithm to determine the
> partition
> > > ->
> > > > > > >> > member_name
> > > > > > >> > > > > mapping, each consumer should get assigned the same
> set
> > of
> > > > > > >> partitions
> > > > > > >> > > > > without requiring state shuffling. So it is not clear
> > > > whether
> > > > > we
> > > > > > >> > have a
> > > > > > >> > > > > strong use-case for this new logic. Can you help
> clarify
> > > > what
> > > > > is
> > > > > > >> the
> > > > > > >> > > > > benefit of using topic "static_member_map" to persist
> > > member
> > > > > > name
> > > > > > >> > map?
> > > > > > >> > > > >
> > > > > > >> > > > > 7) Regarding the introduction of the
> expensionTimeoutMs
> > > > > config,
> > > > > > >> it is
> > > > > > >> > > > > mentioned that "we are using expansion timeout to
> > replace
> > > > > > >> rebalance
> > > > > > >> > > > > timeout, which is configured by max.poll.intervals
> from
> > > > client
> > > > > > >> side,
> > > > > > >> > > and
> > > > > > >> > > > > using registration timeout to replace session
> timeout".
> > > > > > Currently
> > > > > > >> the
> > > > > > >> > > > > default max.poll.interval.ms is configured to be 5
> > > minutes
> > > > > and
> > > > > > >> there
> > > > > > >> > > > will
> > > > > > >> > > > > be only one rebalance if all new consumers can join
> > > within 5
> > > > > > >> minutes.
> > > > > > >> > > So
> > > > > > >> > > > it
> > > > > > >> > > > > is not clear whether we have a strong use-case for
> this
> > > new
> > > > > > >> config.
> > > > > > >> > Can
> > > > > > >> > > > you
> > > > > > >> > > > > explain what is the benefit of introducing this new
> > > config?
> > > > > > >> > > > >
> > > > > > >> > > > > 8) It is mentioned that "To distinguish between
> previous
> > > > > version
> > > > > > >> of
> > > > > > >> > > > > protocol, we will also increase the join group request
> > > > version
> > > > > > to
> > > > > > >> v4
> > > > > > >> > > when
> > > > > > >> > > > > MEMBER_NAME is set" and "If the broker version is not
> > the
> > > > > latest
> > > > > > >> (<
> > > > > > >> > > v4),
> > > > > > >> > > > > the join group request shall be downgraded to v3
> without
> > > > > setting
> > > > > > >> the
> > > > > > >> > > > member
> > > > > > >> > > > > Id". It is probably simpler to just say that this
> > feature
> > > is
> > > > > > >> enabled
> > > > > > >> > if
> > > > > > >> > > > > JoinGroupRequest V4 is supported on both client and
> > broker
> > > > and
> > > > > > >> > > > MEMBER_NAME
> > > > > > >> > > > > is configured with non-empty string.
> > > > > > >> > > > >
> > > > > > >> > > > > 9) It is mentioned that broker may return
> > > > > > >> NO_STATIC_MEMBER_INFO_SET
> > > > > > >> > > error
> > > > > > >> > > > > in OffsetCommitResponse for "commit requests under
> > static
> > > > > > >> > membership".
> > > > > > >> > > > Can
> > > > > > >> > > > > you clarify how broker determines whether the commit
> > > request
> > > > > is
> > > > > > >> under
> > > > > > >> > > > > static membership?
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks,
> > > > > > >> > > > > Dong
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > --
> > > > > > >> > > > -- Guozhang
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > --
> > > > > > >> > -Regards,
> > > > > > >> > Mayuresh R. Gharat
> > > > > > >> > (862) 250-7125
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -Regards,
> > > > > > > Mayuresh R. Gharat
> > > > > > > (862) 250-7125
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -Regards,
> > > > > > Mayuresh R. Gharat
> > > > > > (862) 250-7125
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


-- 
-- Guozhang

Reply via email to