[
https://issues.apache.org/jira/browse/KAFKA-20624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Brutschy resolved KAFKA-20624.
------------------------------------
Resolution: Fixed
> Wire topology description plugin into GroupCoordinatorService — read path
> (describe) and broker wiring
> ------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-20624
> URL: https://issues.apache.org/jira/browse/KAFKA-20624
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: Alieh Saeedi
> Assignee: TengYao Chi
> Priority: Major
> Fix For: 4.4.0
>
>
> streamsGroupDescribe gains includeTopologyDescription flag.
> attachTopologyDescriptions: call plugin.getTopology only when
> StoredTopologyEpoch == currentTopologyEpoch, otherwise report NOT_STORED.
> Status constants: NOT_REQUESTED (0), NOT_STORED (1), ERROR (2), AVAILABLE
> (3). GroupCoordinator interface update. Broker wiring: KafkaApis.scala (new
> handler + describe flag plumbing), BrokerServer.scala (plugin instantiation +
> configuration), RequestConvertToJson. GroupCoordinatorServiceTest
> (describe-path tests), KafkaApisTest, RequestQuotaTest,
> AuthorizerIntegrationTest, checkstyle/suppressions.xml.
> Wire the streams-group topology description plugin into broker-side request
> handling, completing the KAFKA-20618 broker work that KAFKA-20620 (schema +
> stubs) and KAFKA-20623 (coordinator-side write/heartbeat/delete) left for
> this ticket.
> Two paths are wired:
> 1. Describe path (read). streamsGroupDescribe is extended with an
> IncludeTopologyDescription request flag. When the flag is set and the
> persisted StoredDescriptionTopologyEpoch matches the group's current
> topology epoch, a new StreamsGroupTopologyDescriptionManager.
> attachTopologyDescriptions building block calls
> plugin.getTopology(groupId, topologyEpoch) per group and populates the
> response's topology field. Each DescribedGroup carries a
> topologyDescriptionStatus byte: NOT_REQUESTED (0) when the client did
> not ask for it, NOT_STORED (1) when the plugin has no row at the
> requested epoch (or the epoch mismatched), ERROR (2) when the plugin
> call failed, AVAILABLE (3) when the topology is attached. Chain
> assembly lives on GroupCoordinatorService.streamsGroupDescribe; the
> manager exposes per-group plugin invocation as a building block
> (mirroring invokeSetTopology / invokeDeleteTopologies).
> 2. Update path (write — KafkaApis wiring).
> KafkaApis.handleStreamsGroupTopologyDescriptionUpdate currently lives as
> a placeholder stub that always returns UNSUPPORTED_VERSION (scaffolding
> from KAFKA-20620). Replace it with the real handler: gate on
> isStreamsGroupProtocolEnabled (returns UNSUPPORTED_VERSION when the
> streams group protocol is disabled), authorize READ on GROUP for the
> request's groupId (returns GROUP_AUTHORIZATION_FAILED if denied — per
> KIP-1331, "like offset commits, we don't consider this a modification
> of the GROUP, which allows deploying apps with READ ACLs"), then call
> groupCoordinator.streamsGroupTopologyDescriptionUpdate(...) and send
> the result via requestHelper.sendMaybeThrottle. Same shape as the
> sibling handleStreamsGroupHeartbeat. The coordinator-side method was
> added in KAFKA-20623 split 2 (#22552); only the KafkaApis side is
> missing — flagged on the split 2 review.
> Broker wiring (shared). BrokerServer.scala instantiates the plugin via
> config.getConfiguredInstance(...) and threads it into the
> GroupCoordinatorService builder (verify the describe path doesn't need
> additional wiring beyond what KAFKA-20623 already added).
> RequestConvertToJson adds entries for StreamsGroupTopologyDescriptionUpdate
> Request/Response and for the new topology-description fields on the
> describe response so request/response logging tools can render them.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)