Regarding the version, what I was thinking is that in the HB request, for
"serverAssignor" field, instead of just having it as a single string field,
maybe we could consider also making it a structure that includes: name,
minimumVersion, maximumVersion. Where the minimumVersion/maximumVersion
means the versions of the server assignor that the client work best with.
That being said, I agree with you that such information may also be
inferred elsewhere e.g. by looking into the "rackId" field, and see if it
contains a hyphen or not etc. All I was wondering is that, if such version
information would be useful for the server assignors to determine its
actual assignment logic. I do not feel very strong about this one though
--- even if we do not add it now, we can potentially add later, it's just
that changing a single string field to a structure would be hard for
compatibility and we'd then probably have to add top-level fields.

Regarding the `onAssignment` logic, again my train of thoughts is that, if
users want to know exactly when a partition is assigned / revoked, they
would be leveraging on the rebalance callbacks, as that's what people
should rely on to determine "when" partitions are assigned. The
`onAssignment` should be used for getting "why" such partition assignment
decision is made, and hence returning `combined partitions` would be okay.
Streams e.g. implement both rebalance callbacks and the assignors, and it
gets the "when" from the former (and create/close active tasks accordingly)
and the "why" from the latter (and update its global info bookkeeping as
well as standby maintenance accordingly). Most users would be just
interested in the rebalance callback, and not implement their own assignor
at all if they do not care about "why" as they trust the server assignors
would take good care of those, and only about "when". So if we did build
such two types of APIs from scratch, I'd indeed feel that not providing the
partitions but only the metadata for `onAssignment` may be less confusing
and push users to separate the usage of these two more clearly, but since
we already introduced partitions in `onAssignment` for compatibility I'm
less keen on removing them.


Guozhang

On Mon, Sep 26, 2022 at 6:55 AM David Jacot <dja...@confluent.io.invalid>
wrote:

> Hi Guozhang,
>
> Regarding the version, my understanding is that the version would be
> either the client software version or the request version, is this
> correct? If so, we could indeed pass this information down to the
> assignor via the interface. One way would be to pass a "server
> context" to the assignor and that context would include that
> information (and perhaps more). Is this what you are looking for?
>
> Regarding the onAssignment, I think that I understand your point. I
> suppose that the assignor could also be clever and keep track of the
> last metadata to decide whether it has to do something or not. One
> question that is still not clear to me is whether the assignor needs
> to know all the assigned partitions upfront regardless of whether they
> are already revoked or not. Do you think that we need this as well?
>
> From an API perspective, we could have something like
> onAssignment(Metadata(version, reason, metadata, assigned partitions,
> pending partitions)). Where the assigned partitions are the partitions
> ready to be used and the pending partitions are the one assigned to
> the member but not revoked yet. I find it a bit weird that this method
> would be called only once because the assignor would not know when the
> pending partitions changes. That does not look like a clean API. An
> alternative would be to use onAssignment(Metadata(version, reason,
> metadata, combined partitions)) but this seems error prone because it
> is not clear whether a partition is usable or not. Or do you think
> that we should not provide the partitions but only the metadata?
>
> Best,
> David
>
> On Fri, Sep 23, 2022 at 9:40 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Hello David,
> >
> > On Fri, Sep 23, 2022 at 2:00 AM David Jacot <dja...@confluent.io.invalid
> >
> > wrote:
> >
> > > Hey,
> > >
> > > > Just to clarify I was asking about the `version` of the assignor
> (i.e. up
> > > to what version that the client would support), and I do agree we
> would not
> > > need metadata. What I have in mind is that, for some specific built-in
> > > broker-assignors, e.g. rack-aware assignors, if it's possible that in a
> > > newer version we would have a hierarchical rack ID string format, like
> > > "tier1-tier2" etc, but if some client has not upgraded their rack ID
> > > would still be in old format. In this case, the broker then needs to
> choose
> > > the old versioned assignor. I'm probably making something up here for
> rack
> > > aware assignors, but I'm wondering if in general such an
> "auto-downgrade"
> > > behavior would be needed still for broker-side assignor, and if yes
> would
> > > "version" still be useful.
> > >
> > > Got it. That's an interesting thought. I think that the issue is that
> > > the client will never tell you which version of the server-side
> > > assignor should be used. Do you think that the coordinator would
> > > downgrade the version if the assignment fails with a higher version? I
> > > tend to believe that this should be handled within the assignor
> > > itself. In the example that you mentioned, the assignor would have to
> > > handle all the cases. I am not really convinced that we need this at
> > > the moment.
> > >
> > > The version from the client side would not be indicating the broker
> which
> > version to use, but rather which version the client would "work best
> with".
> > Such a "version" field would not be settible by the users, since they
> will
> > be hard-codedly bumped when the Kafka byte code version bumped.
> > Back to the rack aware assignor example, if the older versioned client
> does
> > not have a hierarchical rack ID, however if the assignment returned to
> them
> > is assuming a hierarchical rack structure, it may not reflect the best
> > workload balance among those new and old versioned clients. That means,
> > when receiving the members subscriptions at the server side, if the
> > versions from all these members are different, the broker's assignor may
> > need to consider using the lower version logic to do the assignment. So
> yes
> > the assignor would indeed have to handle all such cases, but it needs to
> do
> > so such that if there are clients who would not work with certain new
> > logic, it would then handle such cases automatically by e.g. still using
> an
> > older versioned logic.
> >
> >
> >
> > > > Okay, my understanding is that the calling ordering of these
> callbacks
> > > would be like the following:
> > >
> > > Yes, your examples look right.
> > >
> > > > I'm wondering if we would still call onAssignment just once, that
> encodes
> > > all the assignment for this rebalance, including all the partitions
> that
> > > should be assigned to the member but not yet assigned since they have
> not
> > > been revoked by others. In that case the call ordering would be:
> > >
> > > Interesting. Is there a case for Streams where having the full
> > > assignment is beneficial? For instance, I can think of the following
> > > case. When a standby task is promoted to an active task, the metadata
> > > would not contain the standby task anymore and the assignment may not
> > > have the partition yet. In this case, Streams would stop the standby
> > > tasks but not have the active task yet if my understanding of Streams
> > > is correct. So knowing the full assignment could be helpful here.
> > >
> > > If we want to do this, we could structure the assignment given to the
> > > member as follow: version, error, metadata, assigned partitions,
> > > pending partitions, where the pending partitions would be the one
> > > assigned to this member but not yet available. What do you think?
> > >
> > > Regarding onAssignment being called only once, I am not sure to fully
> > > grasp the benefit yet. Does the assignor really care about this? In
> > > the end, the epoch does not really matter for the assignor because it
> > > has to converge its state to the desired state anyway.
> > >
> > > Here's my rationale (maybe rephased a bit :P ): the implementers of
> > rebalance listener and assignor are two groups of people, and most users
> > fall into the former group, while only very few people fall into the
> later
> > group. For rebalance listener implementers, they just want to know when a
> > partition is actually revoked or assigned to the consumer and reacts to
> it,
> > for this purpose, `onPartitionsRevoked` and `onPartitionsAssigned` would
> be
> > triggered interleavingly upon `poll` calls across rebalances. The usual
> > logic for such rebalance listeners are metrics reporting, committing
> > offsets (if they do not use Kafka for that), etc. They would not care
> which
> > calls are from which rebalances --- in the past with eager rebalance, it
> > maybe that each rebalance is associated with exactly a
> > `onPartitionsRevoked` first and then a `onPartitionsAssigned`, but it
> would
> > no longer the cases now.
> >
> > The implementers of the assignor though, would care about "how the
> > assignment was made", that includes from which rebalance a certain
> > revoke/assign decision was made, based on what metadata such assignment
> is
> > made, etc. And that's the whole point of the `onAssignment` function
> since
> > otherwise they can just rely on the listeners. They usually
> implementation
> > logic of this callback is to e.g. bookkeep the assignment decision
> driving
> > factors a.k.a. the metadata, global information that needs to be
> propagated
> > to all members, etc. Take Streams as an example, the active processing
> > tasks go along with the assigned partitions, and we can always just
> > incrementally create / close them upon each rebalance listener triggers,
> > when certain partitions are revoked or assigned together; standby tasks
> > however are encoded with the metadata, and we can only know which standby
> > tasks should we get / drop based on the `onAssignment` function, and in
> > fact the creation of such tasks as a result of the metadata bookkeeping
> > does not need to wait until all the partitions that are yet-assigned have
> > been completely assigned to the member. Such information may not always
> be
> > updatable in an incremental manner as the partitions-revoked /
> > partitions-assigned. In such a case, it's better to just trigger this
> > function "once per decision made" i.e. once per rebalance generation.
> >
> >
> > Guozhang
> >
> >
> >
> > > Best,
> > > David
> > >
> > > On Thu, Sep 22, 2022 at 6:01 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > > >
> > > > Hi David, thanks for all the detailed explanations. I think they all
> make
> > > > sense. Just want to have a couple follow-ups here:
> > > >
> > > > > I don't really see the benefits here because server side assignors
> > > > don't have metadata at all. They only assign topic-partitions. They
> > > > are not supposed to generate metadata nor to receive metadata from
> the
> > > > members.
> > > >
> > > > Just to clarify I was asking about the `version` of the assignor
> (i.e. up
> > > > to what version that the client would support), and I do agree we
> would
> > > not
> > > > need metadata. What I have in mind is that, for some specific
> built-in
> > > > broker-assignors, e.g. rack-aware assignors, if it's possible that
> in a
> > > > newer version we would have a hierarchical rack ID string format,
> like
> > > > "tier1-tier2" etc, but if some client has not upgraded their rack ID
> > > > would still be in old format. In this case, the broker then needs to
> > > choose
> > > > the old versioned assignor. I'm probably making something up here for
> > > rack
> > > > aware assignors, but I'm wondering if in general such an
> "auto-downgrade"
> > > > behavior would be needed still for broker-side assignor, and if yes
> would
> > > > "version" still be useful.
> > > >
> > > > > Yeah, that's right. Within a rebalance, `onAssignment` is called
> once
> > > > when the member transitions to a new epoch. This one contains the
> full
> > > > metadata provided by the client side assignor. Then, `onAssignment`
> > > > can be called max N times where N is the number of partitions pending
> > > > revocation by other members. Let me try to clarify this in the KIP.
> > > >
> > > > Okay, my understanding is that the calling ordering of these
> callbacks
> > > > would be like the following:
> > > >
> > > > ----------------------------------------
> > > > onPartitionsRevoked();   // just once, since we do not really need
> > > > to revoke incrementally.
> > > >
> > > > onAssignment();    // the first call, with epoch incremented
> > > > onPartitionsAssigned();   // paired with the onAssignment
> > > >
> > > > onAssignment();              // the first onAssignment would bump up
> the
> > > > epoch, and the metadata reflected.
> > > > onPartitionsAssigned();   // each time we get an additional
> assignment,
> > > we
> > > > call onAssignment and then paired with an onPartitionsAssigned
> > > > ...
> > > > onAssignment();
> > > > onPartitionsAssigned();   // on each of the onAssignment calls, the
> > > encoded
> > > > metadata would not change, only the incrementally added partitions be
> > > > reflected
> > > >
> > > > Is that the case?
> > > >
> > > > I'm wondering if we would still call onAssignment just once, that
> encodes
> > > > all the assignment for this rebalance, including all the partitions
> that
> > > > should be assigned to the member but not yet assigned since they
> have not
> > > > been revoked by others. In that case the call ordering would be:
> > > >
> > > > ----------------------------------------
> > > > onPartitionsRevoked();   // just once
> > > > onAssignment();    // just once, with epoch incremented, and metadata
> > > > encoded changed, the "assignment" field also reflect the final target
> > > > assignment
> > > > onPartitionsAssigned();   // multiple times, which represent
> > > incrementally
> > > > added partitions
> > > > ...
> > > > onPartitionsAssigned();
> > > >
> > > > The motivation from this is that, most users would only implement the
> > > > rebalance callback listeners and hence we'd definitely need to make
> sure
> > > > the semantics of that does not change much, and the time
> > > > `onPartitionsAssigned` indicate the time when the partitions are
> actually
> > > > assigned to it; while for assignors, the `onAssignment` is used to
> > > indicate
> > > > what decision is made regarding for this member, i.e. when the
> partitions
> > > > are decided to be given to it, but not necessarily meant that it has
> been
> > > > given, since that time should be determined by the time of
> > > > `onPartitionsAssigned`. The benefits though, would be that assignor
> > > > implementers would not need to reason which `onAssignment` would be
> the
> > > > last one for this epoch.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Sep 22, 2022 at 2:20 AM David Jacot
> <dja...@confluent.io.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > > 1) The client-side "PartitionAssignor#Assignment" has an Error
> byte
> > > > > field,
> > > > > > while the broker-side "PartitionAssignor#Assignment" does not.
> And
> > > the
> > > > > > rationale seems to be assuming that we should always be able to
> do
> > > the
> > > > > > assignment at the broker-side assignor without errors.
> Personally I
> > > think
> > > > > > it's still potentially beneficial to add the Error field even for
> > > > > > broker-side assignors, e.g. for some edge cases where some
> subscribed
> > > > > > topics are not recognized with the current broker's metadata.
> What
> > > do you
> > > > > > think?
> > > > >
> > > > > Yeah, that seems reasonable. However, I wonder if it would be
> better
> > > > > to use an exception on the server side. This is what we usually do
> for
> > > > > server side plugins. On the client side, we use a field because the
> > > > > errors are not defined in advance.
> > > > >
> > > > > Your comment also makes me think about what we should do when the
> > > > > server side assignor fails. I suppose that we have to keep the
> current
> > > > > assignment until a new event occurs. For instance, in your example,
> > > > > the coordinator would have to trigger a rebalance when unrecognized
> > > > > topics are available. This would be part of the metadata
> monitoring.
> > > > >
> > > > > > 2) The client-side "GroupMember" has three additional fields
> > > > > > reason/version/metadata compared with the broker-side
> GroupMember. I
> > > > > agree
> > > > > > that broker-side assignor would not need reason/metadata since
> they
> > > are
> > > > > > blackbox strings/bytes to the assignor, but what about version?
> E.g.
> > > is
> > > > > it
> > > > > > possible that we evolve our broker-side built-in assignor but
> the old
> > > > > > versioned clients would not be able to work with the new
> version, in
> > > > > which
> > > > > > case we need to let the broker being aware of this and upgrade
> its
> > > > > behavior
> > > > > > to cooperate with the clients?
> > > > >
> > > > > I don't really see the benefits here because server side assignors
> > > > > don't have metadata at all. They only assign topic-partitions. They
> > > > > are not supposed to generate metadata nor to receive metadata from
> the
> > > > > members.
> > > > >
> > > > > > 3) Also related to 2) above, for the client-side "GroupMember",
> > > instead
> > > > > of
> > > > > > including these three fields, what about just adding the
> "Metadata"
> > > field
> > > > > > class which has these three fields? Also, there are two
> "Metadata"
> > > > > > currently in the APIs, the first is a class that encodes
> > > > > > reason/version/metadata, and the second is just the encoded
> metadata
> > > > > bytes.
> > > > > > I'm wondering what about just naming the first as memberMetadata,
> > > which
> > > > > > then has a bytebuffer field Metadata, or instead naming the
> second
> > > > > > bytebuffer field as metadataBytes?
> > > > >
> > > > > That's a good point. Let me try to rationalize this interface
> based on
> > > > > your suggestions.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > > On Tue, Sep 13, 2022 at 9:21 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > Hello David,
> > > > > >
> > > > > > Just had a few more nit questions about the public APIs:
> > > > > >
> > > > > > 1) The client-side "PartitionAssignor#Assignment" has an Error
> byte
> > > > > field,
> > > > > > while the broker-side "PartitionAssignor#Assignment" does not.
> And
> > > the
> > > > > > rationale seems to be assuming that we should always be able to
> do
> > > the
> > > > > > assignment at the broker-side assignor without errors.
> Personally I
> > > think
> > > > > > it's still potentially beneficial to add the Error field even for
> > > > > > broker-side assignors, e.g. for some edge cases where some
> subscribed
> > > > > > topics are not recognized with the current broker's metadata.
> What
> > > do you
> > > > > > think?
> > > > > >
> > > > > > 2) The client-side "GroupMember" has three additional fields
> > > > > > reason/version/metadata compared with the broker-side
> GroupMember. I
> > > > > agree
> > > > > > that broker-side assignor would not need reason/metadata since
> they
> > > are
> > > > > > blackbox strings/bytes to the assignor, but what about version?
> E.g.
> > > is
> > > > > it
> > > > > > possible that we evolve our broker-side built-in assignor but
> the old
> > > > > > versioned clients would not be able to work with the new
> version, in
> > > > > which
> > > > > > case we need to let the broker being aware of this and upgrade
> its
> > > > > behavior
> > > > > > to cooperate with the clients?
> > > > > >
> > > > > > 3) Also related to 2) above, for the client-side "GroupMember",
> > > instead
> > > > > of
> > > > > > including these three fields, what about just adding the
> "Metadata"
> > > field
> > > > > > class which has these three fields? Also, there are two
> "Metadata"
> > > > > > currently in the APIs, the first is a class that encodes
> > > > > > reason/version/metadata, and the second is just the encoded
> metadata
> > > > > bytes.
> > > > > > I'm wondering what about just naming the first as memberMetadata,
> > > which
> > > > > > then has a bytebuffer field Metadata, or instead naming the
> second
> > > > > > bytebuffer field as metadataBytes?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Sep 13, 2022 at 12:08 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello David,
> > > > > > >
> > > > > > > Thanks for bringing this question up. I think the main
> benefits as
> > > you
> > > > > > > listed is 2) above if it stays; just to clarify, we would only
> be
> > > able
> > > > > to
> > > > > > > save one round trip if the rebalance is still triggered by the
> > > broker;
> > > > > if
> > > > > > > the rebalance is triggered by the client then the
> num.round.trips
> > > are
> > > > > the
> > > > > > > same:
> > > > > > >
> > > > > > > 1) With GroupPrepareAssignment:
> > > > > > >
> > > > > > > T0: client decides to do a new assignment, suppose it has
> already
> > > sent
> > > > > a
> > > > > > > HB and hence has to wait for it to return first since only one
> > > request
> > > > > /
> > > > > > > response can be inflight with the coordinator's socket.
> > > > > > > T1: client receives the HB response, and then sends the
> > > > > > > GroupPrepareAssignment request.
> > > > > > > T2: the GroupPrepareAssignment response is returned.
> > > > > > > T3: it calculates the new assignment, and sends a
> > > > > GroupInstallAssignment
> > > > > > > request.
> > > > > > >
> > > > > > > In total, two round trips.
> > > > > > >
> > > > > > > 2) Without GroupPrepareAssignment:
> > > > > > >
> > > > > > > T0: client decides to do a new assignment, suppose it has
> already
> > > sent
> > > > > a
> > > > > > > HB and hence has to wait for it to return first since only one
> > > request
> > > > > /
> > > > > > > response can be inflight with the coordinator's socket.
> > > > > > > T1: client receives the HB response, and then sends the new HB
> > > request
> > > > > > > with the flag indicating a new rebalance needed..
> > > > > > > T2: the HB response with the optional member metadata map is
> > > returned.
> > > > > > > T3: it calculates the new assignment, and sends a
> > > > > GroupInstallAssignment
> > > > > > > request.
> > > > > > >
> > > > > > > In total, two round trips as well.
> > > > > > >
> > > > > > > -----------------------------
> > > > > > >
> > > > > > > So to complete the full picture here, we'd need to modify both
> HB
> > > > > request
> > > > > > > and response so that the client can also indicate a new
> rebalance
> > > via
> > > > > the
> > > > > > > HB request as well, right?
> > > > > > >
> > > > > > > Assuming all above is true, I think it's okay to merge the
> > > > > > > GroupPrepareAssignment into HB given that we can make the
> > > additional
> > > > > fields
> > > > > > > encoding the full member (subscription) metadata and topic
> > > metadata as
> > > > > > > optional fields.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Sep 12, 2022 at 5:22 AM David Jacot
> > > > > <dja...@confluent.io.invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi all,
> > > > > > >>
> > > > > > >> During an offline conversation, someone asked why we need the
> > > > > > >> ConsumerGroupPrepareAssignment API and suggested that we could
> > > instead
> > > > > > >> provide the group state in the heartbeat response. This has a
> few
> > > > > > >> advantages: 1) it does not require using a special error code
> to
> > > > > > >> signal that a new assignment is required as the signal would
> be
> > > the
> > > > > > >> provided group state; 2) it removes one round trip when a
> client
> > > side
> > > > > > >> assignor is used. The downside is that it makes the heartbeat
> > > > > > >> response's definition quite large. I recall that I went with
> the
> > > > > > >> current approach due to this.
> > > > > > >>
> > > > > > >> Providing the group state in the heartbeat response is
> appealing.
> > > What
> > > > > > >> do you guys think?
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> David
> > > > > > >>
> > > > > > >> On Mon, Sep 12, 2022 at 2:17 PM David Jacot <
> dja...@confluent.io>
> > > > > wrote:
> > > > > > >> >
> > > > > > >> > Hi Guozhang,
> > > > > > >> >
> > > > > > >> > 1. I have added a reference to the relevant chapter instead
> of
> > > > > > >> > repeating the whole thing. Does that work for you?
> > > > > > >> >
> > > > > > >> > 2. The "Rebalance Triggers" section you are referring to is
> > > about
> > > > > when
> > > > > > >> > a rebalance should be triggered for the non-upgraded members
> > > using
> > > > > the
> > > > > > >> > old protocol. The section mentions that a rebalance must be
> > > > > triggered
> > > > > > >> > when a new assignment is installed. This implies that the
> group
> > > > > epoch
> > > > > > >> > was updated either by a native member or a non-upgraded
> member.
> > > For
> > > > > > >> > the latter, the JoinGroup request would be the trigger. I
> have
> > > > > added a
> > > > > > >> > reference to the relevant chapter in the "JoinGroup
> Handling"
> > > > > section
> > > > > > >> > as well. Does that make sense?
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> > David
> > > > > > >> >
> > > > > > >> > On Fri, Sep 9, 2022 at 10:35 PM Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > > >> wrote:
> > > > > > >> > >
> > > > > > >> > > Hello David,
> > > > > > >> > >
> > > > > > >> > > Alright I think that's sufficient. Just to make that
> clear in
> > > the
> > > > > doc,
> > > > > > >> > > could we update:
> > > > > > >> > >
> > > > > > >> > > 1) the heartbeat request handling section, stating when
> > > > > coordinator
> > > > > > >> will
> > > > > > >> > > trigger rebalance based on the HB's member metadata /
> reason?
> > > > > > >> > > 2) the "Rebalance Triggers" section to include what we
> > > described
> > > > > in
> > > > > > >> "Group
> > > > > > >> > > Epoch - Trigger a rebalance" section as well?
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > Guozhang
> > > > > > >> > >
> > > > > > >> > > On Fri, Sep 9, 2022 at 1:28 AM David Jacot
> > > > > > >> <dja...@confluent.io.invalid>
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi Guozhang,
> > > > > > >> > > >
> > > > > > >> > > > I thought that the assignor will always be consulted
> when
> > > the
> > > > > next
> > > > > > >> > > > heartbeat request is constructed. In other words,
> > > > > > >> > > > `PartitionAssignor#metadata` will be called for every
> > > heartbeat.
> > > > > > >> This
> > > > > > >> > > > gives the opportunity for the assignor to enforce a
> > > rebalance by
> > > > > > >> > > > setting the reason to a non-zero value or by changing
> the
> > > > > bytes. Do
> > > > > > >> > > > you think that this is not sufficient? Are you
> concerned by
> > > the
> > > > > > >> delay?
> > > > > > >> > > >
> > > > > > >> > > > Best,
> > > > > > >> > > > David
> > > > > > >> > > >
> > > > > > >> > > > On Fri, Sep 9, 2022 at 7:10 AM Guozhang Wang <
> > > > > wangg...@gmail.com>
> > > > > > >> wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > Hello David,
> > > > > > >> > > > >
> > > > > > >> > > > > One of Jun's comments make me thinking:
> > > > > > >> > > > >
> > > > > > >> > > > > ```
> > > > > > >> > > > > In this case, a new assignment is triggered by the
> client
> > > side
> > > > > > >> > > > > assignor. When constructing the HB, the consumer will
> > > always
> > > > > > >> consult
> > > > > > >> > > > > the client side assignor and propagate the
> information to
> > > the
> > > > > > >> group
> > > > > > >> > > > > coordinator. In other words, we don't expect users to
> call
> > > > > > >> > > > > Consumer#enforceRebalance anymore.
> > > > > > >> > > > > ```
> > > > > > >> > > > >
> > > > > > >> > > > > As I looked at the current PartitionAssignor's
> interface,
> > > we
> > > > > > >> actually do
> > > > > > >> > > > > not have a way yet to instruct how to construct the
> next
> > > HB
> > > > > > >> request, e.g.
> > > > > > >> > > > > when the assignor wants to enforce a new rebalance
> with a
> > > new
> > > > > > >> assignment,
> > > > > > >> > > > > we'd need some customizable APIs inside the
> > > PartitionAssignor
> > > > > to
> > > > > > >> indicate
> > > > > > >> > > > > the next HB telling broker about so. WDYT about adding
> > > such an
> > > > > > >> API on the
> > > > > > >> > > > > PartitionAssignor?
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > Guozhang
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Tue, Sep 6, 2022 at 6:09 AM David Jacot
> > > > > > >> <dja...@confluent.io.invalid>
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Hi Jun,
> > > > > > >> > > > > >
> > > > > > >> > > > > > I have updated the KIP to include your feedback. I
> have
> > > also
> > > > > > >> tried to
> > > > > > >> > > > > > clarify the parts which were not cleared.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Best,
> > > > > > >> > > > > > David
> > > > > > >> > > > > >
> > > > > > >> > > > > > On Fri, Sep 2, 2022 at 4:18 PM David Jacot <
> > > > > dja...@confluent.io
> > > > > > >> >
> > > > > > >> > > > wrote:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Hi Jun,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks for your feedback. Let me start by
> answering
> > > your
> > > > > > >> questions
> > > > > > >> > > > > > > inline and I will update the KIP next week.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > Thanks for the KIP. Overall, the main benefits
> of
> > > the
> > > > > KIP
> > > > > > >> seem to
> > > > > > >> > > > be
> > > > > > >> > > > > > fewer
> > > > > > >> > > > > > > > RPCs during rebalance and more efficient
> support of
> > > > > > >> wildcard. A few
> > > > > > >> > > > > > > > comments below.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > I would also add that the KIP removes the global
> sync
> > > > > barrier
> > > > > > >> in the
> > > > > > >> > > > > > > protocol which is essential to improve group
> > > stability and
> > > > > > >> > > > > > > scalability, and the KIP also simplifies the
> client by
> > > > > moving
> > > > > > >> most of
> > > > > > >> > > > > > > the logic to the server side.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 30. ConsumerGroupHeartbeatRequest
> > > > > > >> > > > > > > > 30.1 ServerAssignor is a singleton. Do we plan
> to
> > > > > support
> > > > > > >> rolling
> > > > > > >> > > > > > changing
> > > > > > >> > > > > > > > of the partition assignor in the consumers?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Definitely. The group coordinator will use the
> > > assignor
> > > > > used
> > > > > > >> by a
> > > > > > >> > > > > > > majority of the members. This allows the group to
> move
> > > > > from
> > > > > > >> one
> > > > > > >> > > > > > > assignor to another by a roll. This is explained
> in
> > > the
> > > > > > >> Assignor
> > > > > > >> > > > > > > Selection chapter.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 30.2 For each field, could you explain whether
> it's
> > > > > > >> required in
> > > > > > >> > > > every
> > > > > > >> > > > > > > > request or the scenarios when it needs to be
> > > filled? For
> > > > > > >> example,
> > > > > > >> > > > it's
> > > > > > >> > > > > > not
> > > > > > >> > > > > > > > clear to me when TopicPartitions needs to be
> filled.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > The client is expected to set those fields in
> case of
> > > a
> > > > > > >> connection
> > > > > > >> > > > > > > issue (e.g. timeout) or when the fields have
> changed
> > > since
> > > > > > >> the last
> > > > > > >> > > > > > > HB. The server populates those fields as long as
> the
> > > > > member
> > > > > > >> is not
> > > > > > >> > > > > > > fully reconciled - the member should acknowledge
> that
> > > it
> > > > > has
> > > > > > >> the
> > > > > > >> > > > > > > expected epoch and assignment. I will clarify
> this in
> > > the
> > > > > KIP.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 31. In the current consumer protocol, the rack
> > > affinity
> > > > > > >> between the
> > > > > > >> > > > > > client
> > > > > > >> > > > > > > > and the broker is only considered during
> fetching,
> > > but
> > > > > not
> > > > > > >> during
> > > > > > >> > > > > > assigning
> > > > > > >> > > > > > > > partitions to consumers. Sometimes, once the
> > > assignment
> > > > > is
> > > > > > >> made,
> > > > > > >> > > > there
> > > > > > >> > > > > > is
> > > > > > >> > > > > > > > no opportunity for read affinity because no
> > > replicas of
> > > > > > >> assigned
> > > > > > >> > > > > > partitions
> > > > > > >> > > > > > > > are close to the member. I am wondering if we
> > > should use
> > > > > > >> this
> > > > > > >> > > > > > opportunity
> > > > > > >> > > > > > > > to address this by including rack in
> GroupMember.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > That's an interesting idea. I don't see any issue
> with
> > > > > adding
> > > > > > >> the
> > > > > > >> > > > rack
> > > > > > >> > > > > > > to the members. I will do so.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 32. On the metric side, often, it's useful to
> know
> > > how
> > > > > busy
> > > > > > >> a group
> > > > > > >> > > > > > > > coordinator is. By moving the event loop model,
> it
> > > seems
> > > > > > >> that we
> > > > > > >> > > > could
> > > > > > >> > > > > > add
> > > > > > >> > > > > > > > a metric that tracks the fraction of the time
> the
> > > event
> > > > > > >> loop is
> > > > > > >> > > > doing
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > actual work.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > That's a great idea. I will add it. Thanks.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 33. Could we add a section on coordinator
> failover
> > > > > > >> handling? For
> > > > > > >> > > > > > example,
> > > > > > >> > > > > > > > does it need to trigger the check if any group
> with
> > > the
> > > > > > >> wildcard
> > > > > > >> > > > > > > > subscription now has a new matching topic?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Sure. When the new group coordinator takes over,
> it
> > > has
> > > > > to:
> > > > > > >> > > > > > > * Setup the session timeouts.
> > > > > > >> > > > > > > * Trigger a new assignment if a client side
> assignor
> > > is
> > > > > used.
> > > > > > >> We
> > > > > > >> > > > don't
> > > > > > >> > > > > > > store the information about the member selected to
> > > run the
> > > > > > >> assignment
> > > > > > >> > > > > > > so we have to start a new one.
> > > > > > >> > > > > > > * Update the topics metadata, verify the wildcard
> > > > > > >> subscriptions, and
> > > > > > >> > > > > > > trigger a rebalance if needed.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 34. ConsumerGroupMetadataValue,
> > > > > > >> > > > ConsumerGroupPartitionMetadataValue,
> > > > > > >> > > > > > > > ConsumerGroupMemberMetadataValue: Could we
> document
> > > what
> > > > > > >> the epoch
> > > > > > >> > > > > > field
> > > > > > >> > > > > > > > reflects? For example, does the epoch in
> > > > > > >> ConsumerGroupMetadataValue
> > > > > > >> > > > > > reflect
> > > > > > >> > > > > > > > the latest group epoch? What about the one in
> > > > > > >> > > > > > > > ConsumerGroupPartitionMetadataValue and
> > > > > > >> > > > > > ConsumerGroupMemberMetadataValue?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Sure. I will clarify that but it is always the
> latest
> > > > > group
> > > > > > >> epoch.
> > > > > > >> > > > > > > When the group state is updated, the group epoch
> is
> > > > > bumped so
> > > > > > >> we use
> > > > > > >> > > > > > > that one for all the change records related to the
> > > update.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 35. "the group coordinator will ensure that the
> > > > > following
> > > > > > >> > > > invariants
> > > > > > >> > > > > > are
> > > > > > >> > > > > > > > met: ... All members exists." It's possible for
> a
> > > member
> > > > > > >> not to
> > > > > > >> > > > get any
> > > > > > >> > > > > > > > assigned partitions, right?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > That's right. Here I meant that the members
> provided
> > > by
> > > > > the
> > > > > > >> assignor
> > > > > > >> > > > > > > in the assignment must exist in the group. The
> > > assignor
> > > > > can
> > > > > > >> not make
> > > > > > >> > > > > > > up new member ids.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 36. "He can rejoins the group with a member
> epoch
> > > > > equals to
> > > > > > >> 0":
> > > > > > >> > > > When
> > > > > > >> > > > > > would
> > > > > > >> > > > > > > > a consumer rejoin and what member id would be
> used?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > A member is expected to abandon all its
> partitions and
> > > > > > >> rejoins when
> > > > > > >> > > > it
> > > > > > >> > > > > > > receives the FENCED_MEMBER_EPOCH error. In this
> case,
> > > the
> > > > > > >> group
> > > > > > >> > > > > > > coordinator will have removed the member from the
> > > group.
> > > > > The
> > > > > > >> member
> > > > > > >> > > > > > > can rejoin the group with the same member id but
> with
> > > 0 as
> > > > > > >> epoch. Let
> > > > > > >> > > > > > > me see if I can clarify this in the KIP.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 37. "Instead, power users will have the ability
> to
> > > > > trigger a
> > > > > > >> > > > > > reassignment
> > > > > > >> > > > > > > > by either providing a non-zero reason or by
> > > updating the
> > > > > > >> assignor
> > > > > > >> > > > > > > > metadata." Hmm, this seems to be conflicting
> with
> > > the
> > > > > > >> deprecation
> > > > > > >> > > > of
> > > > > > >> > > > > > > > Consumer#enforeRebalance.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > In this case, a new assignment is triggered by the
> > > client
> > > > > side
> > > > > > >> > > > > > > assignor. When constructing the HB, the consumer
> will
> > > > > always
> > > > > > >> consult
> > > > > > >> > > > > > > the client side assignor and propagate the
> > > information to
> > > > > the
> > > > > > >> group
> > > > > > >> > > > > > > coordinator. In other words, we don't expect
> users to
> > > call
> > > > > > >> > > > > > > Consumer#enforceRebalance anymore.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 38. The reassignment examples are nice. But the
> > > section
> > > > > > >> seems to
> > > > > > >> > > > have
> > > > > > >> > > > > > > > multiple typos.
> > > > > > >> > > > > > > > 38.1 When the group transitions to epoch 2, B
> > > > > immediately
> > > > > > >> gets into
> > > > > > >> > > > > > > > "epoch=1, partitions=[foo-2]", which seems
> > > incorrect.
> > > > > > >> > > > > > > > 38.2 When the group transitions to epoch 3, C
> seems
> > > to
> > > > > get
> > > > > > >> into
> > > > > > >> > > > > > epoch=3,
> > > > > > >> > > > > > > > partitions=[foo-1] too early.
> > > > > > >> > > > > > > > 38.3 After A transitions to epoch 3, C still
> has A -
> > > > > > >> epoch=2,
> > > > > > >> > > > > > > > partitions=[foo-0].
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Sorry for that! I will revise them.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > 39. Rolling upgrade of consumers: Do we support
> the
> > > > > upgrade
> > > > > > >> from
> > > > > > >> > > > any
> > > > > > >> > > > > > old
> > > > > > >> > > > > > > > version to new one?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > We will support upgrading from the consumer
> protocol
> > > > > version
> > > > > > >> 3,
> > > > > > >> > > > > > > introduced in KIP-792. KIP-792 is not implemented
> yet
> > > so
> > > > > the
> > > > > > >> earliest
> > > > > > >> > > > > > > version is unknown at the moment. This is
> explained
> > > in the
> > > > > > >> migration
> > > > > > >> > > > > > > plan chapter.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks again for your feedback, Jun. I will update
> > > the KIP
> > > > > > >> based on
> > > > > > >> > > > it
> > > > > > >> > > > > > > next week.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Best,
> > > > > > >> > > > > > > David
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Thu, Sep 1, 2022 at 9:07 PM Jun Rao
> > > > > > >> <j...@confluent.io.invalid>
> > > > > > >> > > > wrote:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Hi, David,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thanks for the KIP. Overall, the main benefits
> of
> > > the
> > > > > KIP
> > > > > > >> seem to
> > > > > > >> > > > be
> > > > > > >> > > > > > fewer
> > > > > > >> > > > > > > > RPCs during rebalance and more efficient
> support of
> > > > > > >> wildcard. A few
> > > > > > >> > > > > > > > comments below.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 30. ConsumerGroupHeartbeatRequest
> > > > > > >> > > > > > > > 30.1 ServerAssignor is a singleton. Do we plan
> to
> > > > > support
> > > > > > >> rolling
> > > > > > >> > > > > > changing
> > > > > > >> > > > > > > > of the partition assignor in the consumers?
> > > > > > >> > > > > > > > 30.2 For each field, could you explain whether
> it's
> > > > > > >> required in
> > > > > > >> > > > every
> > > > > > >> > > > > > > > request or the scenarios when it needs to be
> > > filled? For
> > > > > > >> example,
> > > > > > >> > > > it's
> > > > > > >> > > > > > not
> > > > > > >> > > > > > > > clear to me when TopicPartitions needs to be
> filled.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 31. In the current consumer protocol, the rack
> > > affinity
> > > > > > >> between the
> > > > > > >> > > > > > client
> > > > > > >> > > > > > > > and the broker is only considered during
> fetching,
> > > but
> > > > > not
> > > > > > >> during
> > > > > > >> > > > > > assigning
> > > > > > >> > > > > > > > partitions to consumers. Sometimes, once the
> > > assignment
> > > > > is
> > > > > > >> made,
> > > > > > >> > > > there
> > > > > > >> > > > > > is
> > > > > > >> > > > > > > > no opportunity for read affinity because no
> > > replicas of
> > > > > > >> assigned
> > > > > > >> > > > > > partitions
> > > > > > >> > > > > > > > are close to the member. I am wondering if we
> > > should use
> > > > > > >> this
> > > > > > >> > > > > > opportunity
> > > > > > >> > > > > > > > to address this by including rack in
> GroupMember.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 32. On the metric side, often, it's useful to
> know
> > > how
> > > > > busy
> > > > > > >> a group
> > > > > > >> > > > > > > > coordinator is. By moving the event loop model,
> it
> > > seems
> > > > > > >> that we
> > > > > > >> > > > could
> > > > > > >> > > > > > add
> > > > > > >> > > > > > > > a metric that tracks the fraction of the time
> the
> > > event
> > > > > > >> loop is
> > > > > > >> > > > doing
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > actual work.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 33. Could we add a section on coordinator
> failover
> > > > > > >> handling? For
> > > > > > >> > > > > > example,
> > > > > > >> > > > > > > > does it need to trigger the check if any group
> with
> > > the
> > > > > > >> wildcard
> > > > > > >> > > > > > > > subscription now has a new matching topic?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 34. ConsumerGroupMetadataValue,
> > > > > > >> > > > ConsumerGroupPartitionMetadataValue,
> > > > > > >> > > > > > > > ConsumerGroupMemberMetadataValue: Could we
> document
> > > what
> > > > > > >> the epoch
> > > > > > >> > > > > > field
> > > > > > >> > > > > > > > reflects? For example, does the epoch in
> > > > > > >> ConsumerGroupMetadataValue
> > > > > > >> > > > > > reflect
> > > > > > >> > > > > > > > the latest group epoch? What about the one in
> > > > > > >> > > > > > > > ConsumerGroupPartitionMetadataValue and
> > > > > > >> > > > > > ConsumerGroupMemberMetadataValue?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 35. "the group coordinator will ensure that the
> > > > > following
> > > > > > >> > > > invariants
> > > > > > >> > > > > > are
> > > > > > >> > > > > > > > met: ... All members exists." It's possible for
> a
> > > member
> > > > > > >> not to
> > > > > > >> > > > get any
> > > > > > >> > > > > > > > assigned partitions, right?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 36. "He can rejoins the group with a member
> epoch
> > > > > equals to
> > > > > > >> 0":
> > > > > > >> > > > When
> > > > > > >> > > > > > would
> > > > > > >> > > > > > > > a consumer rejoin and what member id would be
> used?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 37. "Instead, power users will have the ability
> to
> > > > > trigger a
> > > > > > >> > > > > > reassignment
> > > > > > >> > > > > > > > by either providing a non-zero reason or by
> > > updating the
> > > > > > >> assignor
> > > > > > >> > > > > > > > metadata." Hmm, this seems to be conflicting
> with
> > > the
> > > > > > >> deprecation
> > > > > > >> > > > of
> > > > > > >> > > > > > > > Consumer#enforeRebalance.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 38. The reassignment examples are nice. But the
> > > section
> > > > > > >> seems to
> > > > > > >> > > > have
> > > > > > >> > > > > > > > multiple typos.
> > > > > > >> > > > > > > > 38.1 When the group transitions to epoch 2, B
> > > > > immediately
> > > > > > >> gets into
> > > > > > >> > > > > > > > "epoch=1, partitions=[foo-2]", which seems
> > > incorrect.
> > > > > > >> > > > > > > > 38.2 When the group transitions to epoch 3, C
> seems
> > > to
> > > > > get
> > > > > > >> into
> > > > > > >> > > > > > epoch=3,
> > > > > > >> > > > > > > > partitions=[foo-1] too early.
> > > > > > >> > > > > > > > 38.3 After A transitions to epoch 3, C still
> has A -
> > > > > > >> epoch=2,
> > > > > > >> > > > > > > > partitions=[foo-0].
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > 39. Rolling upgrade of consumers: Do we support
> the
> > > > > upgrade
> > > > > > >> from
> > > > > > >> > > > any
> > > > > > >> > > > > > old
> > > > > > >> > > > > > > > version to new one?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thanks,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Jun
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > On Mon, Aug 29, 2022 at 9:20 AM David Jacot
> > > > > > >> > > > > > <dja...@confluent.io.invalid>
> > > > > > >> > > > > > > > wrote:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > Hi all,
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > The KIP states that we will re-implement the
> > > > > coordinator
> > > > > > >> in
> > > > > > >> > > > Java. I
> > > > > > >> > > > > > > > > discussed this offline with a few folks and
> folks
> > > are
> > > > > > >> concerned
> > > > > > >> > > > that
> > > > > > >> > > > > > > > > we could introduce many regressions in the old
> > > > > protocol
> > > > > > >> if we do
> > > > > > >> > > > so.
> > > > > > >> > > > > > > > > Therefore, I am going to remove this statement
> > > from
> > > > > the
> > > > > > >> KIP. It
> > > > > > >> > > > is an
> > > > > > >> > > > > > > > > implementation detail after all so it does not
> > > have
> > > > > to be
> > > > > > >> > > > decided at
> > > > > > >> > > > > > > > > this stage. We will likely start by trying to
> > > > > refactor the
> > > > > > >> > > > current
> > > > > > >> > > > > > > > > implementation as a first step.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Cheers,
> > > > > > >> > > > > > > > > David
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > On Mon, Aug 29, 2022 at 3:52 PM David Jacot <
> > > > > > >> dja...@confluent.io
> > > > > > >> > > > >
> > > > > > >> > > > > > wrote:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Hi Luke,
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > 1.1. I think the state machine are:
> "Empty,
> > > > > assigning,
> > > > > > >> > > > > > reconciling,
> > > > > > >> > > > > > > > > stable,
> > > > > > >> > > > > > > > > > > dead" mentioned in Consumer Group States
> > > section,
> > > > > > >> right?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > This sentence does not refer to those group
> > > states
> > > > > but
> > > > > > >> rather
> > > > > > >> > > > to a
> > > > > > >> > > > > > > > > > state machine replication (SMR). This
> refers to
> > > the
> > > > > > >> entire
> > > > > > >> > > > state of
> > > > > > >> > > > > > > > > > group coordinator which is replicated via
> the
> > > log
> > > > > > >> layer. I will
> > > > > > >> > > > > > > > > > clarify this in the KIP.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > 1.2. What do you mean "each state machine
> is
> > > > > modelled
> > > > > > >> as an
> > > > > > >> > > > event
> > > > > > >> > > > > > > > > loop"?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > The idea is to follow a model similar to
> the new
> > > > > quorum
> > > > > > >> > > > > > controller. We
> > > > > > >> > > > > > > > > > will have N threads to process events. Each
> > > > > > >> __consumer_offsets
> > > > > > >> > > > > > > > > > partition is assigned to a unique thread and
> > > all the
> > > > > > >> events
> > > > > > >> > > > (e.g.
> > > > > > >> > > > > > > > > > requests, callbacks, etc.) are processed by
> this
> > > > > > >> thread. This
> > > > > > >> > > > > > simplify
> > > > > > >> > > > > > > > > > concurrency and will enable us to do
> simulation
> > > > > testing
> > > > > > >> for the
> > > > > > >> > > > > > group
> > > > > > >> > > > > > > > > > coordinator.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > 1.3. Why do we need a state machine per
> > > > > > >> *__consumer_offsets*
> > > > > > >> > > > > > > > > partitions?
> > > > > > >> > > > > > > > > > > Not a state machine "per consumer group"
> > > owned by
> > > > > a
> > > > > > >> group
> > > > > > >> > > > > > coordinator?
> > > > > > >> > > > > > > > > For
> > > > > > >> > > > > > > > > > > example, if one group coordinator owns 2
> > > consumer
> > > > > > >> groups, and
> > > > > > >> > > > > > both
> > > > > > >> > > > > > > > > exist in
> > > > > > >> > > > > > > > > > > *__consumer_offsets-0*, will we have 1
> state
> > > > > machine
> > > > > > >> for it,
> > > > > > >> > > > or
> > > > > > >> > > > > > 2?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > See 1.1. The confusion comes from there, I
> > > think.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > 1.4. I know the
> "*group.coordinator.threads"
> > > *is
> > > > > the
> > > > > > >> number
> > > > > > >> > > > of
> > > > > > >> > > > > > threads
> > > > > > >> > > > > > > > > used
> > > > > > >> > > > > > > > > > > to run the state machines. But I'm
> wondering
> > > if
> > > > > the
> > > > > > >> purpose
> > > > > > >> > > > of
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > threads
> > > > > > >> > > > > > > > > > > is only to keep the state of each consumer
> > > group
> > > > > (or
> > > > > > >> > > > > > > > > *__consumer_offsets*
> > > > > > >> > > > > > > > > > > partitions?), and no heavy computation,
> why
> > > > > should we
> > > > > > >> need
> > > > > > >> > > > > > > > > multi-threads
> > > > > > >> > > > > > > > > > > here?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > See 1.2. The idea is to have an ability to
> > > shard the
> > > > > > >> > > > processing as
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > > computation could be heavy.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > 2.1. The consumer session timeout, why
> does
> > > the
> > > > > > >> default
> > > > > > >> > > > session
> > > > > > >> > > > > > > > > timeout not
> > > > > > >> > > > > > > > > > > locate between min (45s) and max(60s)? I
> > > thought
> > > > > the
> > > > > > >> min/max
> > > > > > >> > > > > > session
> > > > > > >> > > > > > > > > > > timeout is to define lower/upper bound of
> it,
> > > no?
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > group.consumer.session.timeout.ms int
> 30s The
> > > > > > >> timeout to
> > > > > > >> > > > detect
> > > > > > >> > > > > > client
> > > > > > >> > > > > > > > > > > failures when using the consumer group
> > > protocol.
> > > > > > >> > > > > > > > > > > group.consumer.min.session.timeout.ms int
> > > 45s The
> > > > > > >> minimum
> > > > > > >> > > > > > session
> > > > > > >> > > > > > > > > timeout.
> > > > > > >> > > > > > > > > > > group.consumer.max.session.timeout.ms int
> > > 60s The
> > > > > > >> maximum
> > > > > > >> > > > > > session
> > > > > > >> > > > > > > > > timeout.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > This is indeed a mistake. The default
> session
> > > > > timeout
> > > > > > >> should
> > > > > > >> > > > be 45s
> > > > > > >> > > > > > > > > > (the current default).
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > 2.2. The default server side assignor are
> > > [range,
> > > > > > >> uniform],
> > > > > > >> > > > > > which means
> > > > > > >> > > > > > > > > > > we'll default to "range" assignor. I'd
> like to
> > > > > know
> > > > > > >> why not
> > > > > > >> > > > > > uniform
> > > > > > >> > > > > > > > > one? I
> > > > > > >> > > > > > > > > > > thought usually users will choose uniform
> > > assignor
> > > > > > >> (former
> > > > > > >> > > > sticky
> > > > > > >> > > > > > > > > assinor)
> > > > > > >> > > > > > > > > > > for better evenly distribution. Any other
> > > reason
> > > > > we
> > > > > > >> choose
> > > > > > >> > > > range
> > > > > > >> > > > > > > > > assignor
> > > > > > >> > > > > > > > > > > as default?
> > > > > > >> > > > > > > > > > > group.consumer.assignors List range,
> uniform
> > > The
> > > > > > >> server side
> > > > > > >> > > > > > assignors.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > The order on the server side has no
> influence
> > > > > because
> > > > > > >> the
> > > > > > >> > > > client
> > > > > > >> > > > > > must
> > > > > > >> > > > > > > > > > chose the selector that he wants to use.
> There
> > > is no
> > > > > > >> default
> > > > > > >> > > > in the
> > > > > > >> > > > > > > > > > current proposal. If the assignor is not
> > > specified
> > > > > by
> > > > > > >> the
> > > > > > >> > > > client,
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > > request is rejected. The default client
> value
> > > for
> > > > > > >> > > > > > > > > > `group.remote.assignor` is `uniform` though.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Thanks for your very good comments, Luke. I
> hope
> > > > > that my
> > > > > > >> > > > answers
> > > > > > >> > > > > > help
> > > > > > >> > > > > > > > > > to clarify things. I will update the KIP as
> well
> > > > > based
> > > > > > >> on your
> > > > > > >> > > > > > > > > > feedback.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Cheers,
> > > > > > >> > > > > > > > > > David
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > On Mon, Aug 22, 2022 at 9:29 AM Luke Chen <
> > > > > > >> show...@gmail.com>
> > > > > > >> > > > > > wrote:
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Hi David,
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Thanks for the update.
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Some more questions:
> > > > > > >> > > > > > > > > > > 1. In Group Coordinator section, you
> > > mentioned:
> > > > > > >> > > > > > > > > > > > The new group coordinator will have a
> state
> > > > > machine
> > > > > > >> per
> > > > > > >> > > > > > > > > > > *__consumer_offsets* partitions, where
> each
> > > state
> > > > > > >> machine is
> > > > > > >> > > > > > modelled
> > > > > > >> > > > > > > > > as an
> > > > > > >> > > > > > > > > > > event loop. Those state machines will be
> > > executed
> > > > > in
> > > > > > >> > > > > > > > > > > *group.coordinator.threads* threads.
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > 1.1. I think the state machine are:
> "Empty,
> > > > > assigning,
> > > > > > >> > > > > > reconciling,
> > > > > > >> > > > > > > > > stable,
> > > > > > >> > > > > > > > > > > dead" mentioned in Consumer Group States
> > > section,
> > > > > > >> right?
> > > > > > >> > > > > > > > > > > 1.2. What do you mean "each state machine
> is
> > > > > modelled
> > > > > > >> as an
> > > > > > >> > > > event
> > > > > > >> > > > > > > > > loop"?
> > > > > > >> > > > > > > > > > > 1.3. Why do we need a state machine per
> > > > > > >> *__consumer_offsets*
> > > > > > >> > > > > > > > > partitions?
> > > > > > >> > > > > > > > > > > Not a state machine "per consumer group"
> > > owned by
> > > > > a
> > > > > > >> group
> > > > > > >> > > > > > coordinator?
> > > > > > >> > > > > > > > > For
> > > > > > >> > > > > > > > > > > example, if one group coordinator owns 2
> > > consumer
> > > > > > >> groups, and
> > > > > > >> > > > > > both
> > > > > > >> > > > > > > > > exist in
> > > > > > >> > > > > > > > > > > *__consumer_offsets-0*, will we have 1
> state
> > > > > machine
> > > > > > >> for it,
> > > > > > >> > > > or
> > > > > > >> > > > > > 2?
> > > > > > >> > > > > > > > > > > 1.4. I know the
> "*group.coordinator.threads"
> > > *is
> > > > > the
> > > > > > >> number
> > > > > > >> > > > of
> > > > > > >> > > > > > threads
> > > > > > >> > > > > > > > > used
> > > > > > >> > > > > > > > > > > to run the state machines. But I'm
> wondering
> > > if
> > > > > the
> > > > > > >> purpose
> > > > > > >> > > > of
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > threads
> > > > > > >> > > > > > > > > > > is only to keep the state of each consumer
> > > group
> > > > > (or
> > > > > > >> > > > > > > > > *__consumer_offsets*
> > > > > > >> > > > > > > > > > > partitions?), and no heavy computation,
> why
> > > > > should we
> > > > > > >> need
> > > > > > >> > > > > > > > > multi-threads
> > > > > > >> > > > > > > > > > > here?
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > 2. For the default value in the new
> configs:
> > > > > > >> > > > > > > > > > > 2.1. The consumer session timeout, why
> does
> > > the
> > > > > > >> default
> > > > > > >> > > > session
> > > > > > >> > > > > > > > > timeout not
> > > > > > >> > > > > > > > > > > locate between min (45s) and max(60s)? I
> > > thought
> > > > > the
> > > > > > >> min/max
> > > > > > >> > > > > > session
> > > > > > >> > > > > > > > > > > timeout is to define lower/upper bound of
> it,
> > > no?
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > group.consumer.session.timeout.ms int
> 30s The
> > > > > > >> timeout to
> > > > > > >> > > > detect
> > > > > > >> > > > > > client
> > > > > > >> > > > > > > > > > > failures when using the consumer group
> > > protocol.
> > > > > > >> > > > > > > > > > > group.consumer.min.session.timeout.ms int
> > > 45s The
> > > > > > >> minimum
> > > > > > >> > > > > > session
> > > > > > >> > > > > > > > > timeout.
> > > > > > >> > > > > > > > > > > group.consumer.max.session.timeout.ms int
> > > 60s The
> > > > > > >> maximum
> > > > > > >> > > > > > session
> > > > > > >> > > > > > > > > timeout.
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > 2.2. The default server side assignor are
> > > [range,
> > > > > > >> uniform],
> > > > > > >> > > > > > which means
> > > > > > >> > > > > > > > > > > we'll default to "range" assignor. I'd
> like to
> > > > > know
> > > > > > >> why not
> > > > > > >> > > > > > uniform
> > > > > > >> > > > > > > > > one? I
> > > > > > >> > > > > > > > > > > thought usually users will choose uniform
> > > assignor
> > > > > > >> (former
> > > > > > >> > > > sticky
> > > > > > >> > > > > > > > > assinor)
> > > > > > >> > > > > > > > > > > for better evenly distribution. Any other
> > > reason
> > > > > we
> > > > > > >> choose
> > > > > > >> > > > range
> > > > > > >> > > > > > > > > assignor
> > > > > > >> > > > > > > > > > > as default?
> > > > > > >> > > > > > > > > > > group.consumer.assignors List range,
> uniform
> > > The
> > > > > > >> server side
> > > > > > >> > > > > > assignors.
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Thank you.
> > > > > > >> > > > > > > > > > > Luke
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > On Mon, Aug 22, 2022 at 2:10 PM Luke Chen
> <
> > > > > > >> show...@gmail.com
> > > > > > >> > > > >
> > > > > > >> > > > > > wrote:
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Hi Sagar,
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > I have some thoughts about Kafka Connect
> > > > > > >> integrating with
> > > > > > >> > > > > > KIP-848,
> > > > > > >> > > > > > > > > but I
> > > > > > >> > > > > > > > > > > > think we should have a separate
> discussion
> > > > > thread
> > > > > > >> for the
> > > > > > >> > > > Kafka
> > > > > > >> > > > > > > > > Connect
> > > > > > >> > > > > > > > > > > > KIP: Integrating Kafka Connect With New
> > > Consumer
> > > > > > >> Rebalance
> > > > > > >> > > > > > Protocol
> > > > > > >> > > > > > > > > [1],
> > > > > > >> > > > > > > > > > > > and let this discussion thread focus on
> > > consumer
> > > > > > >> rebalance
> > > > > > >> > > > > > protocol,
> > > > > > >> > > > > > > > > WDYT?
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > [1]
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > >
> > > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > Thank you.
> > > > > > >> > > > > > > > > > > > Luke
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > > On Fri, Aug 12, 2022 at 9:31 PM Sagar <
> > > > > > >> > > > > > sagarmeansoc...@gmail.com>
> > > > > > >> > > > > > > > > wrote:
> > > > > > >> > > > > > > > > > > >
> > > > > > >> > > > > > > > > > > >> Thank you Guozhang/David for the
> feedback.
> > > > > Looks
> > > > > > >> like
> > > > > > >> > > > there's
> > > > > > >> > > > > > > > > agreement on
> > > > > > >> > > > > > > > > > > >> using separate APIs for Connect. I
> would
> > > > > revisit
> > > > > > >> the doc
> > > > > > >> > > > and
> > > > > > >> > > > > > see
> > > > > > >> > > > > > > > > what
> > > > > > >> > > > > > > > > > > >> changes are to be made.
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> Thanks!
> > > > > > >> > > > > > > > > > > >> Sagar.
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> On Tue, Aug 9, 2022 at 7:11 PM David
> Jacot
> > > > > > >> > > > > > > > > <dja...@confluent.io.invalid>
> > > > > > >> > > > > > > > > > > >> wrote:
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > > > > >> > Hi Sagar,
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > Thanks for the feedback and the
> document.
> > > > > That's
> > > > > > >> really
> > > > > > >> > > > > > helpful. I
> > > > > > >> > > > > > > > > > > >> > will take a look at it.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > Overall, it seems to me that both
> Connect
> > > > > and the
> > > > > > >> > > > Consumer
> > > > > > >> > > > > > could
> > > > > > >> > > > > > > > > share
> > > > > > >> > > > > > > > > > > >> > the same underlying "engine". The
> main
> > > > > > >> difference is
> > > > > > >> > > > that
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > Consumer
> > > > > > >> > > > > > > > > > > >> > assigns topic-partitions to members
> > > whereas
> > > > > > >> Connect
> > > > > > >> > > > assigns
> > > > > > >> > > > > > tasks
> > > > > > >> > > > > > > > > to
> > > > > > >> > > > > > > > > > > >> > workers. I see two ways to move
> forward:
> > > > > > >> > > > > > > > > > > >> > 1) We extend the new proposed APIs to
> > > support
> > > > > > >> different
> > > > > > >> > > > > > resource
> > > > > > >> > > > > > > > > types
> > > > > > >> > > > > > > > > > > >> > (e.g. partitions, tasks, etc.); or
> > > > > > >> > > > > > > > > > > >> > 2) We use new dedicated APIs for
> > > Connect. The
> > > > > > >> dedicated
> > > > > > >> > > > APIs
> > > > > > >> > > > > > > > > would be
> > > > > > >> > > > > > > > > > > >> > similar to the new ones but
> different on
> > > the
> > > > > > >> > > > > > content/resources and
> > > > > > >> > > > > > > > > > > >> > they would rely on the same engine
> on the
> > > > > > >> coordinator
> > > > > > >> > > > side.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > I personally lean towards 2) because
> I am
> > > > > not a
> > > > > > >> fan of
> > > > > > >> > > > > > > > > overcharging
> > > > > > >> > > > > > > > > > > >> > APIs to serve different purposes.
> That
> > > being
> > > > > > >> said, I am
> > > > > > >> > > > not
> > > > > > >> > > > > > > > > opposed to
> > > > > > >> > > > > > > > > > > >> > 1) if we can find an elegant way to
> do
> > > it.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > I think that we can continue to
> discuss
> > > it
> > > > > here
> > > > > > >> for now
> > > > > > >> > > > in
> > > > > > >> > > > > > order
> > > > > > >> > > > > > > > > to
> > > > > > >> > > > > > > > > > > >> > ensure that this KIP is compatible
> with
> > > what
> > > > > we
> > > > > > >> will do
> > > > > > >> > > > for
> > > > > > >> > > > > > > > > Connect in
> > > > > > >> > > > > > > > > > > >> > the future.
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > Best,
> > > > > > >> > > > > > > > > > > >> > David
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > On Mon, Aug 8, 2022 at 2:41 PM David
> > > Jacot <
> > > > > > >> > > > > > dja...@confluent.io>
> > > > > > >> > > > > > > > > wrote:
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > Hi all,
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > I am back from vacation. I will go
> > > through
> > > > > and
> > > > > > >> address
> > > > > > >> > > > > > your
> > > > > > >> > > > > > > > > comments
> > > > > > >> > > > > > > > > > > >> > > in the coming days. Thanks for your
> > > > > feedback.
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > Cheers,
> > > > > > >> > > > > > > > > > > >> > > David
> > > > > > >> > > > > > > > > > > >> > >
> > > > > > >> > > > > > > > > > > >> > > On Wed, Aug 3, 2022 at 10:05 PM
> Gregory
> > > > > Harris
> > > > > > >> <
> > > > > > >> > > > > > > > > gharris1...@gmail.com
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >> > wrote:
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > Hey All!
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > Thanks for the KIP, it's
> wonderful
> > > to see
> > > > > > >> > > > cooperative
> > > > > > >> > > > > > > > > rebalancing
> > > > > > >> > > > > > > > > > > >> > making it
> > > > > > >> > > > > > > > > > > >> > > > down the stack!
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > I had a few questions:
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > 1. The 'Rejected Alternatives'
> > > section
> > > > > > >> describes how
> > > > > > >> > > > > > member
> > > > > > >> > > > > > > > > epoch
> > > > > > >> > > > > > > > > > > >> > should
> > > > > > >> > > > > > > > > > > >> > > > advance in step with the group
> epoch
> > > and
> > > > > > >> assignment
> > > > > > >> > > > > > epoch
> > > > > > >> > > > > > > > > values. I
> > > > > > >> > > > > > > > > > > >> > think
> > > > > > >> > > > > > > > > > > >> > > > that this is a good idea for the
> > > reasons
> > > > > > >> described
> > > > > > >> > > > in
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > KIP. When
> > > > > > >> > > > > > > > > > > >> the
> > > > > > >> > > > > > > > > > > >> > > > protocol is incrementally
> assigning
> > > > > > >> partitions to a
> > > > > > >> > > > > > worker,
> > > > > > >> > > > > > > > > what
> > > > > > >> > > > > > > > > > > >> member
> > > > > > >> > > > > > > > > > > >> > > > epoch does each incremental
> > > assignment
> > > > > use?
> > > > > > >> Are
> > > > > > >> > > > member
> > > > > > >> > > > > > epochs
> > > > > > >> > > > > > > > > > > >> re-used,
> > > > > > >> > > > > > > > > > > >> > and
> > > > > > >> > > > > > > > > > > >> > > > a single member epoch can
> correspond
> > > to
> > > > > > >> multiple
> > > > > > >> > > > > > different
> > > > > > >> > > > > > > > > > > >> > (monotonically
> > > > > > >> > > > > > > > > > > >> > > > larger) assignments?
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > 2. Is the Assignor's 'Reason'
> field
> > > > > opaque
> > > > > > >> to the
> > > > > > >> > > > group
> > > > > > >> > > > > > > > > > > >> coordinator? If
> > > > > > >> > > > > > > > > > > >> > > > not, should custom client-side
> > > assignor
> > > > > > >> > > > implementations
> > > > > > >> > > > > > > > > interact
> > > > > > >> > > > > > > > > > > >> with
> > > > > > >> > > > > > > > > > > >> > the
> > > > > > >> > > > > > > > > > > >> > > > Reason field, and how is its
> common
> > > > > meaning
> > > > > > >> agreed
> > > > > > >> > > > > > upon? If
> > > > > > >> > > > > > > > > so, what
> > > > > > >> > > > > > > > > > > >> > is the
> > > > > > >> > > > > > > > > > > >> > > > benefit of a distinct Reason
> field
> > > over
> > > > > > >> including
> > > > > > >> > > > such
> > > > > > >> > > > > > > > > functionality
> > > > > > >> > > > > > > > > > > >> > in the
> > > > > > >> > > > > > > > > > > >> > > > opaque metadata?
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > 3. The following is included in
> the
> > > KIP:
> > > > > > >> "Thanks to
> > > > > > >> > > > > > this, the
> > > > > > >> > > > > > > > > input
> > > > > > >> > > > > > > > > > > >> of
> > > > > > >> > > > > > > > > > > >> > the
> > > > > > >> > > > > > > > > > > >> > > > client side assignor is entirely
> > > driven
> > > > > by
> > > > > > >> the group
> > > > > > >> > > > > > > > > coordinator.
> > > > > > >> > > > > > > > > > > >> The
> > > > > > >> > > > > > > > > > > >> > > > consumer is no longer
> responsible for
> > > > > > >> maintaining
> > > > > > >> > > > any
> > > > > > >> > > > > > state
> > > > > > >> > > > > > > > > besides
> > > > > > >> > > > > > > > > > > >> its
> > > > > > >> > > > > > > > > > > >> > > > assigned partitions." Does this
> mean
> > > > > that the
> > > > > > >> > > > > > client-side
> > > > > > >> > > > > > > > > assignor
> > > > > > >> > > > > > > > > > > >> MAY
> > > > > > >> > > > > > > > > > > >> > > > incorporate additional
> non-Metadata
> > > state
> > > > > > >> (such as
> > > > > > >> > > > > > partition
> > > > > > >> > > > > > > > > > > >> > throughput,
> > > > > > >> > > > > > > > > > > >> > > > cpu/memory metrics, config
> topics,
> > > etc),
> > > > > or
> > > > > > >> that
> > > > > > >> > > > > > additional
> > > > > > >> > > > > > > > > > > >> > non-Metadata
> > > > > > >> > > > > > > > > > > >> > > > state SHOULD NOT be used?
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > 4. I see that there are separate
> > > classes
> > > > > > >> > > > > > > > > > > >> > > > for
> > > > > > >> > > > > >
> org.apache.kafka.server.group.consumer.PartitionAssignor
> > > > > > >> > > > > > > > > > > >> > > > and
> > > > > > >> > > > org.apache.kafka.clients.consumer.PartitionAssignor
> > > > > > >> > > > > > that
> > > > > > >> > > > > > > > > seem to
> > > > > > >> > > > > > > > > > > >> > > > overlap significantly. Is it
> > > possible for
> > > > > > >> these two
> > > > > > >> > > > > > > > > implementations
> > > > > > >> > > > > > > > > > > >> to
> > > > > > >> > > > > > > > > > > >> > be
> > > > > > >> > > > > > > > > > > >> > > > unified? This would serve to
> promote
> > > > > feature
> > > > > > >> parity
> > > > > > >> > > > of
> > > > > > >> > > > > > > > > server-side
> > > > > > >> > > > > > > > > > > >> and
> > > > > > >> > > > > > > > > > > >> > > > client-side assignors, and would
> also
> > > > > > >> facilitate
> > > > > > >> > > > > > operational
> > > > > > >> > > > > > > > > > > >> > flexibility in
> > > > > > >> > > > > > > > > > > >> > > > certain situations. For example,
> if a
> > > > > > >> server-side
> > > > > > >> > > > > > assignor
> > > > > > >> > > > > > > > > has some
> > > > > > >> > > > > > > > > > > >> > poor
> > > > > > >> > > > > > > > > > > >> > > > behavior and needs a patch,
> > > deploying the
> > > > > > >> patched
> > > > > > >> > > > > > assignor to
> > > > > > >> > > > > > > > > the
> > > > > > >> > > > > > > > > > > >> > client
> > > > > > >> > > > > > > > > > > >> > > > and switching one consumer group
> to a
> > > > > > >> client-side
> > > > > > >> > > > > > assignor
> > > > > > >> > > > > > > > > may be
> > > > > > >> > > > > > > > > > > >> > faster
> > > > > > >> > > > > > > > > > > >> > > > and less risky than patching all
> of
> > > the
> > > > > > >> brokers.
> > > > > > >> > > > With
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > currently
> > > > > > >> > > > > > > > > > > >> > > > proposed distinct APIs, a
> non-trivial
> > > > > > >> > > > reimplementation
> > > > > > >> > > > > > would
> > > > > > >> > > > > > > > > have
> > > > > > >> > > > > > > > > > > >> to be
> > > > > > >> > > > > > > > > > > >> > > > assembled, and if the two APIs
> have
> > > > > diverged
> > > > > > >> > > > > > significantly,
> > > > > > >> > > > > > > > > then it
> > > > > > >> > > > > > > > > > > >> is
> > > > > > >> > > > > > > > > > > >> > > > possible that a reimplementation
> > > would
> > > > > not be
> > > > > > >> > > > possible.
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > --
> > > > > > >> > > > > > > > > > > >> > > > Greg Harris
> > > > > > >> > > > > > > > > > > >> > > > gharris1...@gmail.com
> > > > > > >> > > > > > > > > > > >> > > > github.com/gharris1727
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > On Wed, Aug 3, 2022 at 8:39 AM
> Sagar
> > > <
> > > > > > >> > > > > > > > > sagarmeansoc...@gmail.com>
> > > > > > >> > > > > > > > > > > >> > wrote:
> > > > > > >> > > > > > > > > > > >> > > >
> > > > > > >> > > > > > > > > > > >> > > > > Hi Guozhang/David,
> > > > > > >> > > > > > > > > > > >> > > > >
> > > > > > >> > > > > > > > > > > >> > > > > I created a confluence page to
> > > discuss
> > > > > how
> > > > > > >> Connect
> > > > > > >> > > > > > would
> > > > > > >> > > > > > > > > need to
> > > > > > >> > > > > > > > > > > >> > change
> > > > > > >> > > > > > > > > > > >> > > > > based on the new rebalance
> > > protocol.
> > > > > > >> Here's the
> > > > > > >> > > > page:
> > > > > > >> > > > > > > > > > > >> > > > >
> > > > > > >> > > > > > > > > > > >> > > > >
> > > > > > >> > > > > > > > > > > >> >
> > > > > > >> > > > > > > > > > > >>
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > >
> > > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/%5BDRAFT%5DIntegrating+Kafka+Connect+With+New+Consumer+Rebalance+Protocol
> > > > > > >> > > > > > > > > > > >> > > > >
> > > > > > >> > > > > > > > > > > >> > > > > It's also pretty longish and I
> have
> > > > > tried
> > > > > > >> to keep
> > > > > > >> > > > a
> > > > > > >> > > > > > format
> > > > > > >> > > > > > > > > > > >> similar to
> > > > > > >> > > > > > > > > > > >> > > > > KIP-848. Let me know what you
> > > think.
> > > > > Also,
> > > > > > >> do you
> > > > > > >> > > > > > think this
> > > > > > >> > > > > > > > > > > >> should
> > > > > > >> > > > > > > > > > > >> > be
> > > > > > >> > > > > > > > > > > >> > > > > moved to a separate discussion
> > > thread
> > > > > or
> > > > > > >> is this
> > > > > > >> > > > one
> > > > > > >> > > > > > fine?
> > > > > > >> > > > > > > > > > > >> > > > >
> > > > > > >> > > > > > > > > > > >> > > > > Thanks!
> > > > > > >> > > > > > > > > > > >> > > > > Sagar.
> > > > > > >> > > > > > > > > > > >> > > > >
> > > > > > >> > > > > > > > > > > >> > > > > On Tue, Jul 26, 2022 at 7:37 AM
> > > Sagar <
> > > > > > >> > > > > > > > > sagarmeansoc...@gmail.com>
> > > > > > >> > > > > > > > > > > >> > wrote:
> > > > > > >> > > > > > > > > > > >> > > > >
> > > > > > >> > > > > > > > > > > >> > > > > > Hello Guozhang,
> > > > > > >> > > > > > > > > > > >> > > > > >
> > > > > > >> > > > > > > > > > > >> > > > > > Thank you so much for the
> doc on
> > > > > Kafka
> > > > > > >> Streams.
> > > > > > >> > > > > > Sure, I
> > > > > > >> > > > > > > > > would do
> > > > > > >> > > > > > > > > > > >> > the
> > > > > > >> > > > > > > > > > > >> > > > > > analysis and come up with
> such a
> > > > > > >> document.
> > > > > > >> > > > > > > > > > > >> > > > > >
> > > > > > >> > > > > > > > > > > >> > > > > > Thanks!
> > > > > > >> > > > > > > > > > > >> > > > > > Sagar.
> > > > > > >> > > > > > > > > > > >> > > > > >
> > > > > > >> > > > > > > > > > > >> > > > > > On Tue, Jul 26, 2022 at 4:47
> AM
> > > > > Guozhang
> > > > > > >> Wang <
> > > > > > >> > > > > > > > > > > >> wangg...@gmail.com>
> > > > > > >> > > > > > > > > > > >> > > > > wrote:
> > > > > > >> > > > > > > > > > > >> > > > > >
> > > > > > >> > > > > > > > > > > >> > > > > >> Hello Sagar,
> > > > > > >> > > > > > > > > > > >> > > > > >>
> > > > > > >> > > > > > > > > > > >> > > > > >> It would be great if you
> could
> > > come
> > > > > > >> back with
> > > > > > >> > > > some
> > > > > > >> > > > > > > > > analysis on
> > > > > > >> > > > > > > > > > > >> > how to
> > > > > > >> > > > > > > > > > > >> > > > > >> implement the Connect side
> > > > > integration
> > > > > > >> with
> > > > > > >> > > > the new
> > > > > > >> > > > > > > > > protocol;
> > > > > > >> > > > > > > > > > > >> so
> > > > > > >> > > > > > > > > > > >> > far
> > > > > > >> > > > > > > > > > > >> > > > > >> besides leveraging on the
> new
> > > > > "protocol
> > > > > > >> type"
> > > > > > >> > > > we
> > > > > > >> > > > > > did not
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Reply via email to