This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5cf91e4cbec KAFKA-17593; [3/N] Track the number of subscribed members
per regular expression in ConsumerGroup (#17653)
5cf91e4cbec is described below
commit 5cf91e4cbec8d33bd8e938164ca4b3fe172fda55
Author: David Jacot <[email protected]>
AuthorDate: Mon Nov 4 15:39:09 2024 +0100
KAFKA-17593; [3/N] Track the number of subscribed members per regular
expression in ConsumerGroup (#17653)
This patch adds a data structure to ConsumerGroup to track the number of
members subscribed to each regular expressions in the group. This will be
useful to know whether a regex is new in the group or whether a regex must be
removed from the group.
Reviewers: Jeff Kim <[email protected]>, Lianet Magrans
<[email protected]>
---
.../group/modern/consumer/ConsumerGroup.java | 38 ++++++++++++-
.../group/modern/consumer/ConsumerGroupTest.java | 65 ++++++++++++++++++++++
2 files changed, 102 insertions(+), 1 deletion(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 26e3e14def2..6dc91a10fad 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -130,6 +130,11 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>>
currentPartitionEpoch;
+ /**
+ * The number of members subscribed to each regular expressions.
+ */
+ private final TimelineHashMap<String, Integer>
subscribedRegularExpressions;
+
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
String groupId,
@@ -143,6 +148,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
this.classicProtocolMembersSupportedProtocols = new
TimelineHashMap<>(snapshotRegistry, 0);
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry,
0);
+ this.subscribedRegularExpressions = new
TimelineHashMap<>(snapshotRegistry, 0);
}
/**
@@ -294,6 +300,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember,
newMember);
maybeUpdateServerAssignors(oldMember, newMember);
maybeUpdatePartitionEpoch(oldMember, newMember);
+ maybeUpdateSubscribedRegularExpression(oldMember, newMember);
updateStaticMember(newMember);
maybeUpdateGroupState();
maybeUpdateNumClassicProtocolMembers(oldMember, newMember);
@@ -317,6 +324,7 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember,
null);
maybeUpdateServerAssignors(oldMember, null);
maybeRemovePartitionEpoch(oldMember);
+ maybeUpdateSubscribedRegularExpression(oldMember, null);
removeStaticMember(oldMember);
maybeUpdateGroupState();
maybeUpdateNumClassicProtocolMembers(oldMember, null);
@@ -334,6 +342,13 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
}
+ /**
+ * @return The number of members subscribed to the provided regex.
+ */
+ public int numSubscribedMembers(String regex) {
+ return subscribedRegularExpressions.getOrDefault(regex, 0);
+ }
+
/**
* @return The number of members that use the classic protocol.
*/
@@ -365,7 +380,8 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
* @return The epoch or -1.
*/
public int currentPartitionEpoch(
- Uuid topicId, int partitionId
+ Uuid topicId,
+ int partitionId
) {
Map<Integer, Integer> partitions = currentPartitionEpoch.get(topicId);
if (partitions == null) {
@@ -665,6 +681,26 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
}
}
+ /**
+ * Updates the number of the members that use the regular expression.
+ *
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ */
+ private void maybeUpdateSubscribedRegularExpression(
+ ConsumerGroupMember oldMember,
+ ConsumerGroupMember newMember
+ ) {
+ // Decrement the count of the old regex.
+ if (oldMember != null && oldMember.subscribedTopicRegex() != null) {
+
subscribedRegularExpressions.compute(oldMember.subscribedTopicRegex(),
Utils::decValue);
+ }
+ // Increment the count of the new regex.
+ if (newMember != null && newMember.subscribedTopicRegex() != null) {
+
subscribedRegularExpressions.compute(newMember.subscribedTopicRegex(),
Utils::incValue);
+ }
+ }
+
/**
* Updates the number of the members that use the classic protocol.
*
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index a8a9544a1fb..028cd23cfda 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1599,4 +1599,69 @@ public class ConsumerGroupTest {
assertEquals(expectedConsumerGroup.preferredServerAssignor(),
consumerGroup.preferredServerAssignor());
assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
}
+
+ @Test
+ public void testSubscribedRegularExpressionCount() {
+ ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder("member1")
+ .setSubscribedTopicRegex("regex1")
+ .build();
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder("member2")
+ .setSubscribedTopicRegex("regex2")
+ .build();
+ ConsumerGroupMember member3 = new
ConsumerGroupMember.Builder("member3")
+ .setSubscribedTopicRegex("regex1")
+ .build();
+
+ // Assert the initial state.
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+ // Add member 1.
+ consumerGroup.updateMember(member1);
+ assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+ // Add member 2.
+ consumerGroup.updateMember(member2);
+ assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+ // Add member 3.
+ consumerGroup.updateMember(member3);
+ assertEquals(2, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+ // Update member 3.
+ member3 = new ConsumerGroupMember.Builder(member3)
+ .setSubscribedTopicRegex("regex2")
+ .build();
+ consumerGroup.updateMember(member3);
+ assertEquals(1, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+ // Remove member 1.
+ consumerGroup.removeMember(member1.memberId());
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(2, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+ // Remove member 2.
+ consumerGroup.removeMember(member2.memberId());
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(1, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+
+ // Remove member 3.
+ consumerGroup.removeMember(member3.memberId());
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex1"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex2"));
+ assertEquals(0, consumerGroup.numSubscribedMembers("regex3"));
+ }
}