Thanks, Lucas. Following up on the discussion, something came to mind that
seems worth raising here. As we refine the design, I think it is important
to consider that many of the current answers rely on the plugin to handle
the responsibility. Since the user develops the plugin, this is a
particularly sensitive area. We should not make it easy for users to
accidentally shoot themselves in the foot. What happens if a buggy plugin
causes the application to break or puts excessive pressure on the GC? If
so, should the broker enforce any basic safeguards (e.g., per-group
in-flight limit) or at least document recommended limits for plugin
implementations?

Thanks,
Alieh

On Mon, May 11, 2026 at 3:11 PM Lucas Brutschy via dev <[email protected]>
wrote:

> Thanks for the plentiful comments! My responses below:
>
> AJS1) Fair point, I had "plugin" in there initially but removed it
> thinking I would better follow conventions. But I see that it is
> confusing. Changed back to
> group.streams.topology.description.plugin.class.
>
> AJS2) Not sure which Java object you mean, but sink-topics are modeled
> in the "Sink" node, so I don't see this missing. Predecessors are
> omitted on the broker-side, since they are not present on the wire and
> reconstruction seems unnecessary.
>
> AJS3) Good point. Added it.
>
> AJS4) I went back-and-forth on this one as well, but if we don't have
> the error code, all we can return is UNKNOWN_SERVER_ERROR when the
> topology description update fails. I don't think we want to do that,
> since it may trigger alerts set up to detect implementation bugs in
> the Kafka broker. So the point of TOPOLOGY_DESCRIPTION_UPDATE_FAILED
> is just letting the client know that the update failed, but there is
> nothing wrong with the Kafka broker. The client would likely log that
> at INFO level, opposed to WARN or ERRROR for UNKNOWN_SERVER_ERROR.
>
> AJS5) Sure, we can do that.
>
> AJS6) It was not supposed to be a user-facing concept. And I think it
> introduced more problems than it solved, so I removed the concept
> altogether.
>
> AJS7) Good point. This, in part, prompted me to bring back a previous
> version of the design, without the topology decription ID. Now, the
> topology description is created using only the topologyEpoch and a
> group creation timestamp. Then, the plugin can trivially implement
> concurrency control: The plugin can decide to store all versions,
> which means updates for separate topology epochs are independent. Or
> it can just store the latest one, which means that a topology
> description that comes later in the (groupCreationTimestamp,
> topologyEpoch) order takes preference over any previous update.
>
> AJS8) No, there is no direct relationship between a group epoch and a
> topology description.
>
> BB1) The asymmetry is intentional. Predecessors are redundant with
> sucessor information. We can reconstruct it any time. In the admin
> client, which are user-facing, we do reconstruct the predecessor for
> usability. On the broker-side, the main purpose is passing the data
> around, so we do not reconstruct the relation. I'll clarify this in
> the javadoc.
>
> BB2) Direct neighbours only — exactly as you described. I'll clarify
> this in the javadoc.
>
> BB3) The design intentionally pushes transient-retry policy entirely
> onto the plugin: when setTopology fails for a recoverable reason, the
> plugin is expected to keep returning true from requiresTopologyPush on
> subsequent heartbeats. I will clarify this in the Plugin
> Implementation section.
>
> MJS1) I dropped the UUID, as it introduced more complexities than it
> solved.
>
> MJS2a) I think we benefit from not modeling this too tightly. A sink
> is a sink, and the sink topic is a just a metadata annotation on top
> of it. There are corner cases, which I think would be more difficult
> to add later if we do not include NodeType. One is regular
> expressions. We do not model regular expressions here (because
> KIP-1071 does not support them yet -- and the existing client-side
> describe doesn't support them). We can still add regular expressions
> later, but then source topics will still be empty. Even clients that
> do not know the regular expressions field in the future should
> understand that it's a source node, even if maybe it does not
> understand how the source topics are determined. Similarly for sink
> nodes with topicNameExtractors, or any other nifty features people
> will cook up in the future.  There is no defined sink topic for that
> node, but it's still a sink. The NodeType should define the identity,
> not the metadata attached to the node. I actually weakened the
> guarantees in the documentation around this a bit in the KIP, to make
> this clear.
>
> MJS2b) Fair — but it would make the schema a lot less uniform, because
> the corner case of global stores would require it's own record types
> with some fields fixed. Let me know if you feel strongly about it,
> otherwise, I'd vote for re-using the generic topologyRecords.
>
> MJS3) There is no Kafka RPC without a response, so we'd break with all
> conventions and I don't see why we'd even consider it. Apart from
> that, there are useful things to do in reaction to the response - e.g.
> log that the topology is too large, and throttleTimeMs.
>
> MJS4) The intent here is "this broker doesn't advertise this API in
> ApiVersions, but you sent it anyway, so I'm rejecting". This seems
> like a perfectly fine way to use UNSUPPORTED_VERSION to me.
>
> MJS5) This question comes up in every KIP I have the feeling. There is
> no strong reason to do either, but the preference is to use version
> bumps, because they are more compact on the wire and old versions
> without the field can be retired. I cleaned up some prose around this.
>
> MJS6) I think having the module `group-coordinator-api` is a decent
> place to put it. It's the place for broker-side plugin interfaces
> related to group management. Adding a new module broker-plugins for
> all broker-side plugins seems like scope creep for this KIP, and I am
> not even sure it would make things better.
>
> MJS7) Correct, this is a bit misleading. Exceptions are signaled by
> completing the future exceptionally, never by throwing synchronously
> from setTopology. I will clarify this in the KIP.
>
> MJS8) Yes, similar to ASH01 - the plugin needs to make sure that the
> futures are completed.
>
> MJS9) I agree that 3 POJOs are somewhat awkward, but creating
> dependencies, e.g. between the admin client and a broker-side plugin
> seems even worse to me. Where would we put the common data structure?
>
> MJS10) In principle we are modeling a sum type here (topology
> description | reason of absence), but Java doesn't natively model
> these, so we store two separate fields. Including a status that
> doesn't model the AVAILABLE case because it can be inferred seems more
> of a pitfall than necessary. Avoiding redundancy in APIs is a good
> pattern but not a strict rule, and I think having
> topologyDescription().isPresent() == (status == AVAILABLE) is
> perfectly fine. Also Optional would just add a layer of wrapping what
> can be modeled directly via the enum. But I agree that my distinction
> (omit the AVAILABLE in the RPC and include it in the user-facing API)
> may be unnecessarily confusing, see AJS5.
>
> MJS11) Personally, I think only the plugin knows if something went
> wrong and why, so the best place to define metrics is inside the
> plugin. Similar reasoning to why I want to keep configs, deduplication
> inside the plugin. If people strongly feel that we want to have AK
> metrics for this plugin, we can add them.
>
> MJS12) If the feature is disabled on the client but the broker plugin
> is enabled, the broker would indeed try to resolicitate on the
> client-side with back-off. But the solicitation is very light-weight
> (a single boolean, and requireTopologyId is supposed to be fast), and
> clients will just ignore it, so I don't see a problem with this.
>
> MJS13) The plugin doesn't (and shouldn't need to) know the heartbeat
> interval — it should throttle based on its own clock plus its own
> in-flight tracking. Concrete strategy: for each (groupId,
> groupCreationTimeMs, topologyEpoch) tuple the plugin tracks (i)
> whether a push has been initiated (set when requiresTopologyPush first
> returns true) and (ii) the last requiresTopologyPush=true time. While
> (i) is set and the push hasn't completed, requiresTopologyPush returns
> false. After the push completes successfully it returns false
> permanently for that tuple. On a transient failure, after a back-off
> window (independent of heartbeat cadence — say, exponential starting
> at 1s) it returns true again. On permanent failure
> (TOPOLOGY_DESCRIPTION_TOO_LARGE or plugin-semantic INVALID_REQUEST) it
> returns false permanently and logs.
>
> Hope that makes sense!
>
> On Sat, May 9, 2026 at 4:43 AM Matthias J. Sax <[email protected]> wrote:
> >
> > Thanks for the KIP Lucas. I made a first pass. Couple of
> comments/questions.
> >
> >
> > MJS1(a): This is a follow up to ASH03. I am not sure if I understand the
> > problem? The KIP says, we call `requiresTopologyPush` on every HB. So
> > after an upgrade, the plugin will be called for every existing group,
> > allowing the group to send its topology? So we can just create a UUID
> > before this call? In the end, we need to somehow cache all currently
> > in-use UUID anyway (ie, one for each active group)?
> >
> >
> > MJS1(b): Related to the above. The KIP does not say that UUIDs would be
> > stored by the GC -- so after a broker bounce, or GC fail-over to a
> > different broker, it seems we would forget all currently in-use UUIDs
> > and generate new ones? This would align to what I did ask about above,
> > and it should be fine from plugin POV to just get new UUIDs if none is
> > cached?
> >
> >
> > MJS1(c): However, I am not even sure about TopologyDescriptionId? Should
> > we use <groupId,topologyEpoch> instead, avoid this UUID all together? On
> > the other hand, with regard to ASH03, we might introduce the problem
> > Sanghyeok describe when doing this? On the other hand, on startup, the
> > GC could also check all existing groups, and just call
> > `requiresTopologyPush` pro-actively for each group?
> >
> >
> > MJS1(d): KIP-1313 proposes that clients create their UUID -- should we
> > do the same to integrate with KIP-1313 (in case we keep UUID, and not
> > move to <groupId,topologyEpoch>), to align the behavior across the
> > board? In the end, the topology can only change during a roll, which
> > aligns to a topology-epoch bump anyway?
> >
> >
> >
> > MJS2(a): The KIP introduces a `NodeType` field for the topology
> > description. I am wondering if we need it? We have sources, processors,
> > sink. Only sources can have input topics, and only sinks can have output
> > topic, and processor never have any input/output topic, so it seems just
> > inspecting if input/output topic are present, tells us what node type we
> > have, and have an explicit types seems to be redundant?
> >
> >
> > MJS2(b): I am wondering why we model GlobalStores with two
> > TopologyNodes? We know that for this case, there is exactly one source
> > node, and one processor. Should we simplify this?
> >
> >
> >
> > MJS3: Do we actually need to add
> > `UpdateStreamsGroupTopologyDescriptionResponse`, or could we use a "fire
> > and forget" approache? If an topology update failed, and the plugin
> > re-request the push, the next HB-response would take care of it
> > naturally it seems? Atm the only value we get is to send back some error
> > code. It this worth it?
> >
> >
> >
> > MJS4: The KIP add `UNSUPPORTED_VERSION` error.
> >
> > > UNSUPPORTED_VERSION — the coordinator cannot serve this RPC because no
> topology description plugin is configured
> > Is this the right name for this error?
> >
> >
> >
> > MJS5: the KIP proposed to bump the version of both
> > StreamsGroupDescribeRequest and StreamsGroupDescribeResponse. No
> > objection, but wondering why we prefer it over tagged fields?
> >
> >
> >
> > MJS6: Is `org.apache.kafka.coordinator.group.api.streams` the right
> > place for the plugin interface? It seems we add a very heavy dependency
> > for people implementing the plugin. Would it make sense to add a new
> > module `broker-plugins` instead to make it more light weight? Yes,
> > KIP-714 does the same, but it might be a nice improvement for 714, too,
> > to move their plugin into such a new module?
> >
> >
> > MJS7: The KIP says that `setTopology()` may throw
> > TopologyDescriptionTooLargeException? Later in "plugin guidelines" it
> says:
> >
> > > Reject pushes that exceed it by completing the setTopology future with
> TopologyDescriptionTooLargeException
> >
> > Which one is it? We might not want to do it both ways? Also applies to
> > other exception.
> >
> >
> >
> > MJS8: It seems the broker will need to cache all non-yet-completed
> > ComparableFutures. What if the plugin has a bug, and never completes
> > it's future? Would we get some leak? -- Or is your answer the same as
> > for ASH01?
> >
> >
> >
> > MJS9: Should we move `StreamsGroupTopologyDescription` to the same
> > propose `broker-plugin` module? I am also wondering about Alieh's
> > question: Do we keep it's own class in purpose, of should be unify with
> > the existing interface? And why do we get a second
> > `StreamsGroupTopologyDescription` for the admin client? -- I understand
> > your argument about "this may evolve independently", but do we need 3
> > copies of the same?
> >
> >
> > MJS10(a): Do we need to expose `StreamsGroupTopologyDescriptionStatus`
> > via `topologyDescriptionStatus()`? I am wondering if
> > `StreamsGroupTopologyDescription` could model this directly? Also not
> > sure if `topologyDescriptionStatus` should return an `Optional` or not?
> >
> >
> >
> > MJS10(b): I can see argument to ignore what I said in MJS10(b), but if
> > we have an Optional, why do we need status `AVAILABLE`?
> >
> >
> >
> > MJS11: The KIP says we don't add any broker side metrics. KIP-714 did.
> > Wondering why we won't need any for this KIP?
> >
> >
> >
> > MJS12: If this feature is disabled client side, can the broker learn
> > about it? Or might it keep requesting the topology over and over again,
> > and the client would just keep ignoring the request? Would we want some
> > error-code the clients sends to the broker for this case instead?
> >
> >
> > MJS13: In the "plugin guidelines", the KIP says:
> >
> > > Avoid concurrent or repetitive pushes
> >
> > Does the plugin know the HB interval to implement this in reasonable
> > way? Any better guidance we can give how to implement this?
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 5/5/26 6:26 AM, Lucas Brutschy via dev wrote:
> > > Thanks Sanghyeok and Alieh!
> > >
> > > ASH01: Both risks are real. But I would argue that anyways, the
> > > plugins that Kafka currently defines need to be implemented correctly
> > > for Kafka to reliably work. This is the case here as well - just like
> > > a consumer group assignor needs to be correct and fast,
> > > requiresTopologyPush needs to be implemented correct and fast. I did
> > > give most of the responsibility to the plugin here, because it depends
> > > on the plugin in which situations it requires a new topology push from
> > > the client. I feel like adding extra logic to time out slow
> > > requireTopologyPush calls, or enforcing minimum intervals would
> > > require more configuration options and an extra level of deduplication
> > > logic - confusing users that do not use the plugin (configurations are
> > > irrelevant for them) and confusing users that do use the plugin (two
> > > potentially conflicting levels need to be configured).
> > >
> > > ASH02: Good point and this is actually something that I considered.
> > > But it's actually not that easy - in principle, there is only one
> > > current topology description, but there may be an arbitrary number of
> > > stale topologies active in the group. Would we require the plugin to
> > > store all stale topologies? I think this would be a feasible
> > > extension, but would definitely add some complexity. I would propose
> > > adding this as a follow-up item. I can add this to the future work
> > > section.
> > >
> > > ASH03: Good catch. I don't really want to have an "upgrade logic" that
> > > we need to preserve forever in the group coordinator. I think it would
> > > be okay to allow "ZERO_UUID" for any topology that exists when the
> > > broker upgraded.
> > >
> > > ASH04: I noted this in the future work section. In principle we can
> > > detect mismatches between topology descriptions on the client, but we
> > > do not include it in this KIP, since it would complicate things. The
> > > first successfully stored topology is authoritative.
> > >
> > > AS01) Both topologies are derived from the same Topology instance on
> > > the client at a given epoch, so at the source they're consistent by
> > > construction. During a topology update, the topology ID changes, and
> > > initially we will not have a new topology description passed to the
> > > plugin. In this case, we can get an intermediate NOT_STORED response
> > > when we try to describe the streams group, until a client pushes the
> > > new topology description. But the result will be consistent with the
> > > topology information used for assignments. This assumes that all
> > > clients with the same topology epoch use the same topology
> > > description. Mismatch detection is noted as future work.
> > >
> > > AS02) I think your point about retrying with too large descriptions is
> > > valid. It would actually make sense to leave topology size checking to
> > > the plugin as well - it can decide the maximum topology size and stop
> > > returning requiresTopologyPush for topologies that are confirmed to be
> > > too large. I will make this change in the next revision of the KIP.
> > >
> > > AS03) Yes, this is mostly to avoid dependencies between Kafka
> > > packages. Note that we do not necessarily need to keep the two
> > > implementations in sync. The streams-side TopologyDescription may
> > > evolve differently than the admin-side TopologyDescription. The two
> > > are only weakly linked through the RPC definition.
> > >
> > > AS04) Agreed. However, topology has the slowest-changing lifecycle of
> > > the three, so it should be less confusing than assignments and
> > > members.
> > >
> > > AS05) Correct. The plugin is free to forward, persist, fan out, mirror
> > > to multiple sinks, or anything else. The KIP intentionally doesn't
> > > constrain the storage backend.
> > >
> > > On Tue, May 5, 2026 at 1:19 PM Alieh Saeedi <[email protected]>
> wrote:
> > >>
> > >> Thanks Lucas for the KIP. The KIP is already in very good shape and
> covers the edge cases. I still have a few questions and considerations I’d
> like to share.
> > >>
> > >> AS01:  Are Assignment topology (defined in KIP-1071) and the
> Description topology (defined in KIP-1331) guaranteed to be consistent
> views of the same logical topology, or can they drift? Are we guaranteeing
> that every assignment we surface references only nodes/topics present in
> the current description topology, or can operators see combinations that
> don’t line up?
> > >>
> > >> AS02: I'm cusrious about the rationale or empirical data behind the
> 350 KB default (e.g., based on observed real-world topologies)? Also the
> KIP says the broker measures topology size and rejects oversized payloads
> with TOPOLOGY_DESCRIPTION_TOO_LARGE. Should the Streams client attempt a
> best-effort pre-check of the serialized size to avoid repeated failing
> pushes and log a clearer local error? Or is the intent to keep the client
> simple and rely entirely on the broker response + plugin behavior for this
> case?
> > >>
> > >> AS03: Why do we introduce a separate Admin-side POJO instead of
> reusing TopologyDescription from the Streams API—for dependency/semantic
> reasons? And how do we plan to keep the two representations in sync?
> > >>
> > >> AS04: Somewhat related to AS01.... In practice we’ve seen that
> because members and assignments change so dynamically, a user may see
> different assignments or members over just a few seconds, or a member with
> a specific memberId may disappear entirely. Having the topology visible
> might help users understand what’s going on—but it could also make things
> more confusing, depending on the situation.
> > >>
> > >> AS05: I assume that even with a single plugin, multiple downstream
> systems can still benefit from it (the plugin can of course fan out to
> multiple downstream systems). Am I right?
> > >>
> > >> Thanks,
> > >> Alieh
> > >>
> > >>
> > >>
> > >> On Mon, May 4, 2026 at 11:39 AM Lucas Brutschy via dev <
> [email protected]> wrote:
> > >>>
> > >>> Hi all,
> > >>>
> > >>> I would like to start the discussion on KIP-1331. The idea is to
> > >>> optionally make a topology description available to the broker, in
> the
> > >>> spirit of KIP-714. Looking forward to your feedback!
> > >>>
> > >>>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331*3A*Streams*Group*Topology*Description*Plugin__;JSsrKysr!!Ayb5sqE7!sxqGDUcjOzRpt9Gk0jE1XnVSit-FZMIihk2UsXWUI0jmdYK2nTcO1hP-9WiW5sLBMw8amIUxG2PGvhdRhok$
> > >>>
> > >>> Best,
> > >>> Lucas
> >
>

Reply via email to