This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5cf91e4cbec KAFKA-17593; [3/N] Track the number of subscribed members 
per regular expression in ConsumerGroup (#17653)
5cf91e4cbec is described below

commit 5cf91e4cbec8d33bd8e938164ca4b3fe172fda55
Author: David Jacot <[email protected]>
AuthorDate: Mon Nov 4 15:39:09 2024 +0100

    KAFKA-17593; [3/N] Track the number of subscribed members per regular 
expression in ConsumerGroup (#17653)
    
    This patch adds a data structure to ConsumerGroup to track the number of 
members subscribed to each regular expressions in the group. This will be 
useful to know whether a regex is new in the group or whether a regex must be 
removed from the group.
    
    Reviewers: Jeff Kim <[email protected]>, Lianet Magrans 
<[email protected]>
---
 .../group/modern/consumer/ConsumerGroup.java       | 38 ++++++++++++-
 .../group/modern/consumer/ConsumerGroupTest.java   | 65 ++++++++++++++++++++++
 2 files changed, 102 insertions(+), 1 deletion(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 26e3e14def2..6dc91a10fad 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -130,6 +130,11 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      */
     private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> 
currentPartitionEpoch;
 
+    /**
+     * The number of members subscribed to each regular expressions.
+     */
+    private final TimelineHashMap<String, Integer> 
subscribedRegularExpressions;
+
     public ConsumerGroup(
         SnapshotRegistry snapshotRegistry,
         String groupId,
@@ -143,6 +148,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
         this.classicProtocolMembersSupportedProtocols = new 
TimelineHashMap<>(snapshotRegistry, 0);
         this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+        this.subscribedRegularExpressions = new 
TimelineHashMap<>(snapshotRegistry, 0);
     }
 
     /**
@@ -294,6 +300,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
newMember);
         maybeUpdateServerAssignors(oldMember, newMember);
         maybeUpdatePartitionEpoch(oldMember, newMember);
+        maybeUpdateSubscribedRegularExpression(oldMember, newMember);
         updateStaticMember(newMember);
         maybeUpdateGroupState();
         maybeUpdateNumClassicProtocolMembers(oldMember, newMember);
@@ -317,6 +324,7 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, 
null);
         maybeUpdateServerAssignors(oldMember, null);
         maybeRemovePartitionEpoch(oldMember);
+        maybeUpdateSubscribedRegularExpression(oldMember, null);
         removeStaticMember(oldMember);
         maybeUpdateGroupState();
         maybeUpdateNumClassicProtocolMembers(oldMember, null);
@@ -334,6 +342,13 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         }
     }
 
+    /**
+     * @return The number of members subscribed to the provided regex.
+     */
+    public int numSubscribedMembers(String regex) {
+        return subscribedRegularExpressions.getOrDefault(regex, 0);
+    }
+
     /**
      * @return The number of members that use the classic protocol.
      */
@@ -365,7 +380,8 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      * @return The epoch or -1.
      */
     public int currentPartitionEpoch(
-        Uuid topicId, int partitionId
+        Uuid topicId,
+        int partitionId
     ) {
         Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
         if (partitions == null) {
@@ -665,6 +681,26 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         }
     }
 
+    /**
+     * Updates the number of the members that use the regular expression.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateSubscribedRegularExpression(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        // Decrement the count of the old regex.
+        if (oldMember != null && oldMember.subscribedTopicRegex() != null) {
+            
subscribedRegularExpressions.compute(oldMember.subscribedTopicRegex(), 
Utils::decValue);
+        }
+        // Increment the count of the new regex.
+        if (newMember != null && newMember.subscribedTopicRegex() != null) {
+            
subscribedRegularExpressions.compute(newMember.subscribedTopicRegex(), 
Utils::incValue);
+        }
+    }
+
     /**
      * Updates the number of the members that use the classic protocol.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index a8a9544a1fb..028cd23cfda 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1599,4 +1599,69 @@ public class ConsumerGroupTest {
         assertEquals(expectedConsumerGroup.preferredServerAssignor(), 
consumerGroup.preferredServerAssignor());
         assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
     }
+
+    @Test
+    public void testSubscribedRegularExpressionCount() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicRegex("regex1")
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member2")
+            .setSubscribedTopicRegex("regex2")
+            .build();
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder("member3")
+            .setSubscribedTopicRegex("regex1")
+            .build();
+
+        // Assert the initial state.
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+        // Add member 1.
+        consumerGroup.updateMember(member1);
+        assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+        // Add member 2.
+        consumerGroup.updateMember(member2);
+        assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+        // Add member 3.
+        consumerGroup.updateMember(member3);
+        assertEquals(2, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+        // Update member 3.
+        member3 = new ConsumerGroupMember.Builder(member3)
+            .setSubscribedTopicRegex("regex2")
+            .build();
+        consumerGroup.updateMember(member3);
+        assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+        // Remove member 1.
+        consumerGroup.removeMember(member1.memberId());
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+        // Remove member 2.
+        consumerGroup.removeMember(member2.memberId());
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+        // Remove member 3.
+        consumerGroup.removeMember(member3.memberId());
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
+        assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+    }
 }

Reply via email to