jolshan commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1247156849
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -728,6 +794,81 @@ public void replay( } consumerGroup.removeMember(memberId); } + + updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); + } + + /** + * @return The set of groups subscribed to the topic. + */ + public Set<String> groupsSubscribedToTopic(String topicName) { + Set<String> groups = groupsByTopics.get(topicName); + return groups != null ? groups : Collections.emptySet(); + } + + /** + * Subscribes a group to a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ + private void subscribeGroupToTopic( + String groupId, + String topicName + ) { + groupsByTopics + .computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(groupId); + } + + /** + * Unsubscribes a group from a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ + private void unsubscribeGroupFromTopic( + String groupId, + String topicName + ) { + groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> { + groupIds.remove(groupId); + return groupIds.isEmpty() ? null : groupIds; + }); + } + + /** + * Updates the group by topics mapping. + * + * @param groupId The group id. + * @param oldSubscribedTopics The old group subscriptions. + * @param newSubscribedTopics The new group subscriptions. + */ + private void updateGroupsByTopics( Review Comment: do we have any test on this and the above code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org