vamossagar12 commented on code in PR #14327: URL: https://github.com/apache/kafka/pull/14327#discussion_r1371564208
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -727,20 +728,42 @@ private void maybeUpdateSubscribedTopicNames( * @param newMember The new member. */ private static void maybeUpdateSubscribedTopicNames( + TopicsImage topics, Map<String, Integer> subscribedTopicCount, ConsumerGroupMember oldMember, ConsumerGroupMember newMember ) { if (oldMember != null) { - oldMember.subscribedTopicNames().forEach(topicName -> - subscribedTopicCount.compute(topicName, ConsumerGroup::decValue) - ); + String oldMemberRegex = oldMember.subscribedTopicRegex(); + if (oldMemberRegex == null || oldMemberRegex.isEmpty()) { + oldMember.subscribedTopicNames().forEach(topicName -> + subscribedTopicCount.compute(topicName, ConsumerGroup::decValue) + ); + } else { + Pattern pattern = Pattern.compile(oldMemberRegex); + oldMember.subscribedTopicNames() Review Comment: Do we need this condition? Eventually for the old member's subscribed topic we need to decrement the counts. Assuming that any topic subscribed by virtue of regex based subscriptions as well will increment the count, we don't need to run pattern matching like this I feel. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ########## @@ -556,6 +564,56 @@ public void testUpdateSubscriptionMetadata() { image.cluster() ) ); + + // Removing member1 results in returning bar and zar. + assertEquals( + mkMap( + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + ), + consumerGroup.computeSubscriptionMetadata( + member1, + null, + image.topics(), + image.cluster() + ) + ); + + // Updating group with removal of member1. + consumerGroup.removeMember(member1.memberId()); + + // Compute while taking into account member 4 + assertEquals( + mkMap( + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))), + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("food", new TopicMetadata(foodTopicId, "food", 4, mkMapOfPartitionRacks(4))) + ), + consumerGroup.computeSubscriptionMetadata( + null, + member4, + image.topics(), + image.cluster() + ) + ); + + // Updating group with member5. + consumerGroup.updateMember(member5); Review Comment: Don't we need to update member4 as well here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org