jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247156849


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, 
consumerGroup.subscribedTopicNames());
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);
+        return groups != null ? groups : Collections.emptySet();
+    }
+
+    /**
+     * Subscribes a group to a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void subscribeGroupToTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics
+            .computeIfAbsent(topicName, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+            .add(groupId);
+    }
+
+    /**
+     * Unsubscribes a group from a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void unsubscribeGroupFromTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
+            groupIds.remove(groupId);
+            return groupIds.isEmpty() ? null : groupIds;
+        });
+    }
+
+    /**
+     * Updates the group by topics mapping.
+     *
+     * @param groupId               The group id.
+     * @param oldSubscribedTopics   The old group subscriptions.
+     * @param newSubscribedTopics   The new group subscriptions.
+     */
+    private void updateGroupsByTopics(

Review Comment:
   do we have any test on this and the above code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to