Hi Gabriella,
Thanks for the KIP. It looks good overall and consistent with the KIP-848
consumer assignors design.

DJ_03: Could we go a bit further with documenting `configs()`? Does it
always return the full set of config keys and where can a developer writing
a custom assignor find the full list of config keys they need to consider?

AS3: Right now the convention for group configs is to prefix with the group
type. eg. we have "consumer.session.timeout.ms" and "
streams.session.timeout.ms" instead of a single session timeout config.

sq01: nit: `MemberSubscription` doesn't contain any subscription info. I
understand the name came from the consumer assignor interface. In other
places, we have chosen to rename interfaces to be more accurate
(MemberAssignmentState). Do we want to do the same or stick to the consumer
assignor names?

Thanks,
Sean

On Wed, Jun 24, 2026 at 2:45 PM Lucas Brutschy via dev <[email protected]>
wrote:

> Hi,
>
> Thanks for the updates Gabby, lgtm!
>
> On AS1, consumer assignors also have no timeout, so I'd stay
> consistent with that. KIP could note assignors are responsible for
> bounded execution and that a hung assignor blocks the coordinator
> thread.
>
> Cheers,
> Lucas
>
> On Wed, Jun 24, 2026 at 1:46 PM Alieh Saeedi via dev
> <[email protected]> wrote:
> >
> > Hey Gabriella,
> >
> > Thanks for putting this KIP together. I think the main concerns have
> > already been raised and addressed well. I just wanted to note a couple of
> > personal questions:
> >
> > AS1: Is there any guardrail for a custom assignor that is slow or gets
> > stuck in a loop, such as a timeout, given that it runs on the group
> > coordinator thread? If not, it may be worth stating in the KIP that
> > assignor authors are responsible for keeping execution bounded, along
> with
> > the operational impact of a hung assignor. I remember the initial
> > implementation of the sticky assignor being slow, which I think will
> affect
> > rebalance behavior, so this made me think this might be important to call
> > out.
> >
> > AS2: What happens during downgrade or config removal? For example, if a
> > group has a persisted streams.assignor.name and the broker is later
> > downgraded to a version that does not support this feature, or the
> assignor
> > is removed from group.streams.assignors, is the persisted per-group
> config
> > ignored or cleared? Does the group cleanly fall back to the default? I’m
> > not sure whether covering the downgrade path makes sense in this KIP, but
> > it came to mind and I was curious whether you had thought about it.
> >
> > AS3: For the per-group config streams.assignor.name, I think
> > group.assignor.name reads more clearly, but it’s up to you.
> >
> > Nit: There’s also a typo to fix — group.streams.assignors is listed in
> > singular form twice in the KIP.
> >
> > -Alieh
> >
> > On Tue, Jun 23, 2026 at 11:18 PM Gabriella Fu via dev <
> [email protected]>
> > wrote:
> >
> > > Hi all,
> > >
> > > I updated the KIP based on the discussion. Here are the changes I made:
> > >
> > >    -
> > >
> > >    DJ_01 (configs) / LB / MJS: I collapsed this to a single broker
> config,
> > >    group.streams.assignors (LIST, default [sticky]), mirroring
> > >    group.consumer.assignors in KIP-848. The first entry is the cluster
> > >    default, and selectable short names are derived from
> TaskAssignor.name()
> > >    rather than declared separately, so the previous
> > >    group.streams.assignors.names config is removed. I also removed the
> > >    misleading “avoids leaking class names” motivation.
> > >    -
> > >
> > >    DJ_02 / MJS : All input/output types are now interfaces rather than
> > >    records. GroupSpec is now an interface with per-member accessors
> > > instead of
> > >    returning a raw Map. AssignmentMemberSpec is split into
> > > MemberSubscription
> > >    (static metadata: processId, instanceId, rackId, clientTags) and the
> > >    member’s runtime assignment, matching the KIP-848 split.
> > >    -
> > >
> > >    DJ_03: I renamed assignmentConfigs() to configs() and clarified
> that it
> > >    returns only the assignment-relevant subset of group configs (for
> > > example,
> > >    num.standby.replicas), not all group configs.
> > >    -
> > >
> > >    DJ_04 : no change.
> > >    -
> > >
> > >    LB1: I clarified that loaded assignors implementing
> > >    org.apache.kafka.common.Configurable have configure() invoked with
> the
> > >    broker configuration at startup, the same as consumer-side
> assignors.
> > >    Per-group dynamic settings remain a separate channel via
> > >    GroupSpec.configs().
> > >    -
> > >
> > >    MJS1: All newly public types are now annotated with
> > >    @InterfaceStability.Evolving so we can refine the API in minor
> releases
> > >    without making a compatibility commitment.
> > >    -
> > >
> > >    MJS2s: I dropped warm-up tasks from the assignor input and output.
> > >    MemberAssignment now exposes only active and standby tasks.
> > >
> > > Let me know if you have more concern.
> > >
> > > Thanks,
> > >
> > > Gabriella
> > >
> > > On Tue, Jun 23, 2026 at 12:23 PM Gabriella Fu <[email protected]>
> wrote:
> > >
> > > > Hi all, thank you so much for the discussion
> > > >
> > > > LB1: Yes. Custom TaskAssignor loading does the same as the consumer
> side.
> > > > Each entry in group.streams.assignors is resolved to an instance at
> > > broker
> > > > startup, and if it implements Configurable, the broker calls
> > > > configure(config.originals()) on it. So a custom assignor can read
> > > > broker‑level configuration the same way a consumer assignor can.
> > > Per‑group
> > > > dynamic settings (e.g. num.standby.replicas) remain a separate
> channel
> > > via
> > > > GroupSpec.configs(). I'll make this explicit in the KIP.
> > > >
> > > >
> > > > MJS1:Agreed. I'll mark all newly public-facing interfaces, classes,
> and
> > > > methods as @InterfaceStability.Evolving so we can refine them in
> minor
> > > > releases without a compatibility commitment. I'll update the KIP to
> state
> > > > this for the whole new API surface.
> > > >
> > > > MJS2: Thank you for pointing it out. I will drop the warmup tasks
> from
> > > the
> > > > output MemberAssignment
> > > >
> > > > Thanks.
> > > > Gabriella
> > > >
> > > >
> > > >
> > > > On Sun, Jun 21, 2026 at 7:03 PM Matthias J. Sax <[email protected]>
> > > wrote:
> > > >
> > > >> Thanks for the KIP Gabby.
> > > >>
> > > >> Two comments about the newly added interfaces/records.
> > > >>
> > > >> MJS1: I would like to mark all new public facing API as @Evolving
> for
> > > >> now. We might not get it right with the first release, and marking
> as
> > > >> evolving would indicate that the API is not stable yet, and we can
> > > >> introduce breaking changes in minor releases, allowing us to fix
> errors
> > > >> quickly in future minor releases.
> > > >>
> > > >> MJS2: The existing (non public) assignor API returns
> `GroupAssignment`
> > > >> which includes warmup task. I believe it's an artifact from the
> > > >> "classic" case, in which the HA assignor was required to compute
> > > >> warmups. However, with "streams" we can simplify the assignor and it
> > > >> would only compute active and standby tasks, while only the GC (ie,
> > > >> reconciler) will use warmup tasks. Thus, we should change the
> interface
> > > >> accordingly, dropping warmup tasks from `MemberAssignment`.
> > > >>
> > > >>
> > > >> About DJ_02: I have no experience with evolving Java `records` but
> it
> > > >> seems they are more similar to `classes`. In general, `interfaces`
> > > >> provide much higher flexibility than `classes`, so I agree with
> David
> > > >> that it might be good to _only_ use `interfaces` and no `records`.
> > > >>
> > > >>
> > > >> About the configs (DJ_01): I guess I am fine either way, to either
> > > >> follow the current proposal, or mimic KIP-848 more closely with a
> single
> > > >> broker config containing a mix of built-in assignor-names and
> > > >> fully-qualified class names.
> > > >>
> > > >> In doubt I would agree with Lucas: the KIP-848 approach is a little
> bit
> > > >> cleaner, as having one config seems easier than having two. Let's
> hear
> > > >> from David about it.
> > > >>
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >> On 6/16/26 1:55 AM, Lucas Brutschy via dev wrote:
> > > >> > Hi all,
> > > >> >
> > > >> > DJ_01: Agree, and I'd push it a little further. On the consumer
> side
> > > >> > the short name isn't configured at all — group.consumer.assignors
> is a
> > > >> > single list of built-in names and class names, each entry is
> resolved
> > > >> > to an instance, and they're mapped by name(); the first entry is
> the
> > > >> > cluster default. So once the streams assignors are loaded, their
> short
> > > >> > names already come from TaskAssignor.name(), which makes a
> separate
> > > >> > group.streams.assignors.names redundant — a single
> > > >> > group.streams.assignors list (default [sticky]) reproduces the
> > > >> > consumer behavior. That also runs against the "mixed name/class
> > > >> > configuration" rejected alternative, since the consumer config is
> > > >> > exactly that mixed list.
> > > >> >
> > > >> > DJ_04: I agree with Gabby; "validate all dynamic group configs on
> the
> > > >> > broker" seems out of scope for this KIP.
> > > >> >
> > > >> > LB1: When consumer assignors are loaded from
> group.consumer.assignors,
> > > >> > instances implementing Configurable get the broker configs passed
> to
> > > >> > them. Will the custom TaskAssignor loading do the same, so a
> custom
> > > >> > assignor can read broker-level configuration the way a consumer
> > > >> > assignor can?
> > > >> >
> > > >> > Thanks,
> > > >> > Lucas
> > > >> >
> > > >> > On Fri, Jun 12, 2026 at 9:53 PM Gabriella Fu via dev
> > > >> > <[email protected]> wrote:
> > > >> >>
> > > >> >> Hi David,
> > > >> >>
> > > >> >> Thanks a lot for the detailed review!
> > > >> >>
> > > >> >> *DJ_01:* That's a fair point. I'll update the KIP to use
> > > >> >> group.streams.assignors to align with KIP-848 and remove the
> > > misleading
> > > >> >> motivation section.
> > > >> >>
> > > >> >> *DJ_02:* Thanks for the suggestion. I went through the KIP-848
> > > >> interfaces
> > > >> >> in org.apache.kafka.coordinator.group.api.assignor as well as the
> > > >> existing
> > > >> >> implementations (UniformAssignor, RangeAssignor), and the pattern
> > > makes
> > > >> >> total sense now. I'll update the input types in the KIP
> accordingly:
> > > >> >>
> > > >> >>     -
> > > >> >>
> > > >> >>     GroupSpec will be changed to an interface with per-member
> > > >> accessors,
> > > >> >>     instead of returning a raw Map<String, AssignmentMemberSpec>.
> > > >> >>     -
> > > >> >>
> > > >> >>     AssignmentMemberSpec will be replaced by two distinct
> interfaces:
> > > >> >>     MemberSubscription and MemberAssignment. This will separate
> the
> > > >> member's
> > > >> >>     static metadata (processId, instanceId, rackId, clientTags)
> from
> > > >> its
> > > >> >>     runtime task assignment (active/standby/warmup tasks),
> mirroring
> > > >> the
> > > >> >>     clean split in KIP-848.
> > > >> >>
> > > >> >> *DJ_03:* Thank you for the suggestion. I'll rename it to
> configs()
> > > and
> > > >> >> clarify in the text that it will only return configurations
> relevant
> > > >> to the
> > > >> >> assignment.
> > > >> >>
> > > >> >> *DJ_04:*Moving all dynamic group config validation to the broker
> > > makes
> > > >> >> sense, but that would be a bit beyond the scope of this KIP. In
> this
> > > >> case,
> > > >> >> maybe we should just keep the config like this?
> > > >> >>
> > > >> >> Best,
> > > >> >> Gabriella
> > > >> >>
> > > >> >> On Fri, Jun 12, 2026 at 10:54 AM David Jacot <[email protected]>
> > > wrote:
> > > >> >>
> > > >> >>> Hi Gabriella,
> > > >> >>>
> > > >> >>> Thanks for the KIP. I have a few high level comments:
> > > >> >>>
> > > >> >>> DJ_01: The motivation to not use `group.streams.assignors` and
> > > follow
> > > >> >>> the pattern introduced by KIP-848 is pretty weak in my opinion,
> > > >> >>> especially the "avoids leaking implementation class names into
> > > >> >>> per-group dynamic configuration" part. The class names wont leak
> > > into
> > > >> >>> the group config as the group config requires the name of the
> > > >> >>> assignor. From a user perspective, it is exactly the same
> concept so
> > > >> >>> using a different way to express it is wrong in my opinion.
> Should
> > > we
> > > >> >>> just use `group.streams.assignors` to be consistent in our
> configs?
> > > >> >>>
> > > >> >>> DJ_02: I would suggest reconsidering the interface of the
> assignor.
> > > We
> > > >> >>> started with a similar interface in KIP-848 and we realized
> during
> > > the
> > > >> >>> implementation that using POJOS and returning Maps (e.g. for
> > > >> >>> members()) was really inflexible. Moreover, I am not sure if
> using
> > > >> >>> records is good from an evolutionary point of view. Using
> interfaces
> > > >> >>> may be better. It also allows us to wrap internal objects to
> expose
> > > >> >>> them to the assignor. Have you looked into the KIP-848's
> interface
> > > and
> > > >> >>> the various assignors?
> > > >> >>>
> > > >> >>> DJ_03: Regarding the `assignmentConfigs()`, should we call it
> > > >> >>> `configs()` as it is already clear that it is for the
> assignment.
> > > >> >>> Moreover, I wonder if it is going to return all group configs or
> > > only
> > > >> >>> a subset of them. Could you please clarify in the KIP?
> > > >> >>>
> > > >> >>> DJ_04: The dynamic group configs validation makes sense to me.
> Note
> > > >> >>> that this pattern is not used today. I also wonder whether we
> should
> > > >> >>> move the validation of all dynamic group configs there. To be
> > > >> >>> discussed.
> > > >> >>>
> > > >> >>> Best,
> > > >> >>> David
> > > >> >>>
> > > >> >>> On Thu, Jun 11, 2026 at 8:37 PM Gabriella Fu via dev
> > > >> >>> <[email protected]> wrote:
> > > >> >>>>
> > > >> >>>> Hi all,
> > > >> >>>>
> > > >> >>>> I’d like to start the discussion for KIP-1357: Add broker side
> > > custom
> > > >> >>>> assignors for "streams" groups
> > > >> >>>>
> > > >> >>>> KIP:
> > > >> >>>>
> > > >> >>>
> > > >>
> > >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=430408758__;!!Ayb5sqE7!t7TDJ8XzjCLooFy9FhJLiHKHu5c7RJf8-41tQuqRykmMLMB2yl4ApQsRMSEVKCIIl0FuvbznfZcDww$
> > > >> >>>> JIRA:
> > > >> >>>
> > > >>
> > >
> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/KAFKA-20683__;!!Ayb5sqE7!t7TDJ8XzjCLooFy9FhJLiHKHu5c7RJf8-41tQuqRykmMLMB2yl4ApQsRMSEVKCIIl0Fuvbzx-7tAbA$
> > > >> >>>>
> > > >> >>>> Summary:
> > > >> >>>> The Streams Rebalance Protocol (KIP-1071) moves task assignment
> > > from
> > > >> the
> > > >> >>>> client to the broker, but unlike the classic protocol  and
> KIP-848
> > > >> >>> consumer
> > > >> >>>> groups, it offers no way to plug in a custom assignor. This KIP
> > > >> closes
> > > >> >>> that
> > > >> >>>> gap by making the existing streams task assignor interfaces
> public
> > > >> API
> > > >> >>> and
> > > >> >>>> adding broker and group configurations so operators can load
> custom
> > > >> >>>> assignor implementations and select them per group by short
> name,
> > > >> with no
> > > >> >>>> client-side involvement.
> > > >> >>>>
> > > >> >>>> Please let me know your feedback
> > > >> >>>>
> > > >> >>>> Thanks,
> > > >> >>>> Gabriella Fu
> > > >> >>>
> > > >>
> > > >>
> > >
>

Reply via email to