This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e99561e1f30 KAFKA-17593; [10/N] Remove resolved regular expressions 
when unsubscribed (#17976)
e99561e1f30 is described below

commit e99561e1f30a30f9b151669a78d340f286a3ead6
Author: David Jacot <[email protected]>
AuthorDate: Thu Dec 5 08:41:37 2024 +0100

    KAFKA-17593; [10/N] Remove resolved regular expressions when unsubscribed 
(#17976)
    
    This patch does a few things:
    1) It cleans up resolved regular expressions when they are unsubscribed 
from. It covers the regular leave/fenced paths for the new protocol and it also 
covers the LeaveGroup API as new members could be removed via the admin API.
    2) It ensures that tombstones for  resolved regular expressions are 
generated on the conversion patch from consumer to classic group.
    3) It fixes 
[KAFKA-18116](https://issues.apache.org/jira/browse/KAFKA-18116) because I 
faced the same issue while working on the LeaveGroup API. It adds an 
integration test for this case too.
    
    Reviewers: Dongnuo Lyu <[email protected]>, Jeff Kim <[email protected]>
---
 .../server/GroupCoordinatorBaseRequestTest.scala   |  29 +-
 .../unit/kafka/server/LeaveGroupRequestTest.scala  |  68 +++-
 .../coordinator/group/GroupMetadataManager.java    |  81 ++++-
 .../group/modern/consumer/ConsumerGroup.java       | 107 +++++-
 .../apache/kafka/coordinator/group/Assertions.java |  33 ++
 .../group/GroupMetadataManagerTest.java            | 256 +++++++++++++
 .../group/GroupMetadataManagerTestContext.java     |   4 +
 .../group/modern/consumer/ConsumerGroupTest.java   | 396 ++++++++++++++++++---
 8 files changed, 880 insertions(+), 94 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index 3b375c879c4..7db5258b8e4 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -654,7 +654,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
 
   protected def consumerGroupDescribe(
     groupIds: List[String],
-    includeAuthorizedOperations: Boolean,
+    includeAuthorizedOperations: Boolean = false,
     version: Short = 
ApiKeys.CONSUMER_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
   ): List[ConsumerGroupDescribeResponseData.DescribedGroup] = {
     val consumerGroupDescribeRequest = new 
ConsumerGroupDescribeRequest.Builder(
@@ -782,14 +782,12 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     )
   }
 
-  protected def leaveGroupWithOldProtocol(
+  protected def classicLeaveGroup(
     groupId: String,
     memberIds: List[String],
     groupInstanceIds: List[String] = null,
-    expectedLeaveGroupError: Errors,
-    expectedMemberErrors: List[Errors],
     version: Short = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
-  ): Unit = {
+  ): LeaveGroupResponseData = {
     val leaveGroupRequest = new LeaveGroupRequest.Builder(
       groupId,
       List.tabulate(memberIds.length) { i =>
@@ -799,6 +797,24 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
       }.asJava
     ).build(version)
 
+    connectAndReceive[LeaveGroupResponse](leaveGroupRequest).data
+  }
+
+  protected def leaveGroupWithOldProtocol(
+    groupId: String,
+    memberIds: List[String],
+    groupInstanceIds: List[String] = null,
+    expectedLeaveGroupError: Errors,
+    expectedMemberErrors: List[Errors],
+    version: Short = ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)
+  ): Unit = {
+    val leaveGroupResponse = classicLeaveGroup(
+      groupId,
+      memberIds,
+      groupInstanceIds,
+      version
+    )
+
     val expectedResponseData = new LeaveGroupResponseData()
     if (expectedLeaveGroupError != Errors.NONE) {
       expectedResponseData.setErrorCode(expectedLeaveGroupError.code)
@@ -812,8 +828,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
         }.asJava)
     }
 
-    val leaveGroupResponse = 
connectAndReceive[LeaveGroupResponse](leaveGroupRequest)
-    assertEquals(expectedResponseData, leaveGroupResponse.data)
+    assertEquals(expectedResponseData, leaveGroupResponse)
   }
 
   protected def leaveGroup(
diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
index 54164c064e8..84e609fcd92 100644
--- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.server
 
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.message.LeaveGroupResponseData
 import org.apache.kafka.common.test.api.ClusterInstance
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import org.apache.kafka.common.test.api.ClusterTestExtensions
@@ -23,26 +25,70 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.JoinGroupRequest
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
+import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.extension.ExtendWith
 
+import scala.jdk.CollectionConverters._
+
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(types = Array(Type.KRAFT))
+@ClusterTestDefaults(types = Array(Type.KRAFT), serverProperties = Array(
+  new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+  new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+))
 class LeaveGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
-  @ClusterTest(serverProperties = Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
+  @ClusterTest
+  def testLeaveGroupWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- 3 to 
ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {
+      val memberId = Uuid.randomUuid().toString
+      assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
+        groupId = "group",
+        memberId = memberId,
+        memberEpoch = 0,
+        instanceId = "instance-id",
+        rebalanceTimeoutMs = 5 * 60 * 1000,
+        subscribedTopicNames = List("foo"),
+        topicPartitions = List.empty,
+      ).errorCode)
+
+      assertEquals(
+        new LeaveGroupResponseData()
+          .setMembers(List(
+            new LeaveGroupResponseData.MemberResponse()
+              .setMemberId(memberId)
+              .setGroupInstanceId("instance-id")
+          ).asJava),
+        classicLeaveGroup(
+          groupId = "group",
+          memberIds = List(JoinGroupRequest.UNKNOWN_MEMBER_ID),
+          groupInstanceIds = List("instance-id"),
+          version = version.toShort
+        )
+      )
+
+      assertEquals(
+        ConsumerGroupState.EMPTY.toString,
+        consumerGroupDescribe(List("group")).head.groupState
+      )
+    }
+  }
+
+  @ClusterTest
   def testLeaveGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
     testLeaveGroup()
   }
 
-  @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), serverProperties = 
Array(
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, value = "false"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
-  ))
+  @ClusterTest
   def testLeaveGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
     testLeaveGroup()
   }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 6aabc76ec25..95fff558d5e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -163,6 +163,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord;
@@ -2220,7 +2221,8 @@ public class GroupMetadataManager {
                     .setPreviousMemberEpoch(0)
                     .build();
 
-                // Generate the records to replace the member.
+                // Generate the records to replace the member. We don't care 
about the regular expression
+                // here because it is taken care of later after the static 
membership replacement.
                 replaceMember(records, group, existingStaticMemberOrNull, 
newMember);
 
                 log.info("[GroupId {}] Static member with instance id {} 
re-joins the consumer group " +
@@ -2332,7 +2334,7 @@ public class GroupMetadataManager {
             if (isNotEmpty(oldSubscribedTopicRegex) && 
group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
                 // If the member was the last one subscribed to the regex, we 
delete the
                 // resolved regular expression.
-                
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
+                records.add(newConsumerGroupRegularExpressionTombstone(
                     groupId,
                     oldSubscribedTopicRegex
                 ));
@@ -2937,6 +2939,7 @@ public class GroupMetadataManager {
             return new CoordinatorResult<>(records, response, null, false);
         } else {
             removeMember(records, group.groupId(), member.memberId());
+            maybeDeleteResolvedRegularExpression(records, group, member);
 
             // We update the subscription metadata without the leaving member.
             Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
@@ -2956,6 +2959,7 @@ public class GroupMetadataManager {
             // We bump the group epoch.
             int groupEpoch = group.groupEpoch() + 1;
             records.add(newConsumerGroupEpochRecord(group.groupId(), 
groupEpoch));
+            log.info("[GroupId {}] Bumped group epoch to {}.", 
group.groupId(), groupEpoch);
 
             cancelTimers(group.groupId(), member.memberId());
 
@@ -3039,6 +3043,59 @@ public class GroupMetadataManager {
         ));
     }
 
+    /**
+     * Maybe delete the resolved regular expression associated to the provided 
member if
+     * it was the last subscribed member to it.
+     *
+     * @param records   The record accumulator.
+     * @param group     The group.
+     * @param member    The member removed from the group.
+     */
+    private void maybeDeleteResolvedRegularExpression(
+        List<CoordinatorRecord> records,
+        ConsumerGroup group,
+        ConsumerGroupMember member
+    ) {
+        if (isNotEmpty(member.subscribedTopicRegex()) && 
group.numSubscribedMembers(member.subscribedTopicRegex()) == 1) {
+            records.add(newConsumerGroupRegularExpressionTombstone(
+                group.groupId(),
+                member.subscribedTopicRegex()
+            ));
+        }
+    }
+
+    /**
+     * Maybe delete the resolved regular expressions associated with the 
provided members
+     * if they were the last ones subscribed to them.
+     *
+     * @param records   The record accumulator.
+     * @param group     The group.
+     * @param members   The member removed from the group.
+     * @return The set of deleted regular expressions.
+     */
+    private Set<String> maybeDeleteResolvedRegularExpressions(
+        List<CoordinatorRecord> records,
+        ConsumerGroup group,
+        Set<ConsumerGroupMember> members
+    ) {
+        Map<String, Integer> counts = new HashMap<>();
+        members.forEach(member -> {
+            if (isNotEmpty(member.subscribedTopicRegex())) {
+                counts.compute(member.subscribedTopicRegex(), Utils::incValue);
+            }
+        });
+
+        Set<String> deletedRegexes = new HashSet<>();
+        counts.forEach((regex, count) -> {
+            if (group.numSubscribedMembers(regex) == count) {
+                
records.add(newConsumerGroupRegularExpressionTombstone(group.groupId(), regex));
+                deletedRegexes.add(regex);
+            }
+        });
+
+        return deletedRegexes;
+    }
+
     /**
      * Write tombstones for the member. The order matters here.
      *
@@ -5897,7 +5954,7 @@ public class GroupMetadataManager {
         if (group.type() == CLASSIC) {
             return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
         } else if (group.type() == CONSUMER) {
-            return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+            return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
request);
         } else {
             throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
         }
@@ -5907,14 +5964,12 @@ public class GroupMetadataManager {
      * Handle a classic LeaveGroupRequest to a ConsumerGroup.
      *
      * @param group          The ConsumerGroup.
-     * @param context        The request context.
      * @param request        The actual LeaveGroup request.
      *
      * @return The LeaveGroup response and the records to append.
      */
     private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> 
classicGroupLeaveToConsumerGroup(
         ConsumerGroup group,
-        RequestContext context,
         LeaveGroupRequestData request
     ) throws UnknownMemberIdException {
         String groupId = group.groupId();
@@ -5933,7 +5988,7 @@ public class GroupMetadataManager {
                     member = group.getOrMaybeCreateMember(memberId, false);
                     throwIfMemberDoesNotUseClassicProtocol(member);
 
-                    log.info("[Group {}] Dynamic member {} has left group " +
+                    log.info("[GroupId {}] Dynamic member {} has left group " +
                             "through explicit `LeaveGroup` request; client 
reason: {}",
                         groupId, memberId, reason);
                 } else {
@@ -5943,11 +5998,11 @@ public class GroupMetadataManager {
                     // in which case we expect the MemberId to be undefined.
                     if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
                         throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                        throwIfMemberDoesNotUseClassicProtocol(member);
                     }
-                    throwIfMemberDoesNotUseClassicProtocol(member);
 
                     memberId = member.memberId();
-                    log.info("[Group {}] Static member {} with instance id {} 
has left group " +
+                    log.info("[GroupId {}] Static member {} with instance id 
{} has left group " +
                             "through explicit `LeaveGroup` request; client 
reason: {}",
                         groupId, memberId, instanceId, reason);
                 }
@@ -5971,9 +6026,16 @@ public class GroupMetadataManager {
         }
 
         if (!records.isEmpty()) {
+            // Check whether resolved regular expressions could be deleted.
+            Set<String> deletedRegexes = maybeDeleteResolvedRegularExpressions(
+                records,
+                group,
+                validLeaveGroupMembers
+            );
+
             // Maybe update the subscription metadata.
             Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
-                group.computeSubscribedTopicNames(validLeaveGroupMembers),
+                
group.computeSubscribedTopicNamesWithoutDeletedMembers(validLeaveGroupMembers, 
deletedRegexes),
                 metadataImage.topics(),
                 metadataImage.cluster()
             );
@@ -5986,6 +6048,7 @@ public class GroupMetadataManager {
 
             // Bump the group epoch.
             records.add(newConsumerGroupEpochRecord(groupId, 
group.groupEpoch() + 1));
+            log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
group.groupEpoch() + 1);
         }
 
         return new CoordinatorResult<>(records, new 
LeaveGroupResponseData().setMembers(memberResponses));
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index ed05f636189..1fdfe5ae87e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -352,6 +352,69 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         }
     }
 
+    /**
+     * Updates the subscription count.
+     *
+     * @param oldMember             The old member.
+     * @param newMember             The new member.
+     *
+     * @return Copy of the map of topics to the count of number of subscribers.
+     */
+    public Map<String, Integer> computeSubscribedTopicNames(
+        ConsumerGroupMember oldMember,
+        ConsumerGroupMember newMember
+    ) {
+        Map<String, Integer> subscribedTopicsNames = 
super.computeSubscribedTopicNames(oldMember, newMember);
+
+        String oldSubscribedTopicRegex = null;
+        if (oldMember != null && oldMember.subscribedTopicRegex() != null && 
!oldMember.subscribedTopicRegex().isEmpty()) {
+            oldSubscribedTopicRegex = oldMember.subscribedTopicRegex();
+        }
+
+        if (oldSubscribedTopicRegex != null) {
+            String newSubscribedTopicRegex = null;
+            if (newMember != null && newMember.subscribedTopicRegex() != null 
&& !newMember.subscribedTopicRegex().isEmpty()) {
+                newSubscribedTopicRegex = newMember.subscribedTopicRegex();
+            }
+
+            // If the old member was the last one subscribed to the regex and 
the new member
+            // is not subscribed to it, we must remove it from the subscribed 
topic names.
+            if (!oldSubscribedTopicRegex.equals(newSubscribedTopicRegex) && 
numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
+                
resolvedRegularExpression(oldSubscribedTopicRegex).ifPresent(resolvedRegularExpression
 ->
+                    resolvedRegularExpression.topics.forEach(topic -> 
subscribedTopicsNames.compute(topic, Utils::decValue))
+                );
+            }
+        }
+
+        return subscribedTopicsNames;
+    }
+
+    /**
+     * Computes an updated copy of the subscribed topic names without the 
provided
+     * removed members and removed regular expressions.
+     *
+     * @param removedMembers    The set of removed members.
+     * @param removedRegexes    The set of removed regular expressions.
+     *
+     * @return Copy of the map of topics to the count of number of subscribers.
+     */
+    public Map<String, Integer> 
computeSubscribedTopicNamesWithoutDeletedMembers(
+        Set<ConsumerGroupMember> removedMembers,
+        Set<String> removedRegexes
+    ) {
+        Map<String, Integer> subscribedTopicsNames = 
super.computeSubscribedTopicNames(removedMembers);
+
+        removedRegexes.forEach(regex ->
+            
resolvedRegularExpression(regex).ifPresent(resolvedRegularExpression ->
+                resolvedRegularExpression.topics.forEach(topic ->
+                    subscribedTopicsNames.compute(topic, Utils::decValue)
+                )
+            )
+        );
+
+        return subscribedTopicsNames;
+    }
+
     /**
      * Update the resolved regular expression.
      *
@@ -631,21 +694,25 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
      */
     @Override
     public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
-        members().forEach((memberId, member) ->
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(),
 memberId))
+        members.keySet().forEach(memberId ->
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId))
         );
 
-        members().forEach((memberId, member) ->
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(),
 memberId))
+        members.keySet().forEach(memberId ->
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId))
         );
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId));
 
-        members().forEach((memberId, member) ->
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(),
 memberId))
+        members.keySet().forEach(memberId ->
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId))
         );
 
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
+        resolvedRegularExpressions.keySet().forEach(regex ->
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 regex))
+        );
+
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId));
     }
 
     /**
@@ -662,24 +729,28 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         String leavingMemberId,
         String joiningMemberId
     ) {
-        members().forEach((memberId, __) -> {
+        members.keySet().forEach(memberId -> {
             String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(),
 removedMemberId));
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 removedMemberId));
         });
 
-        members().forEach((memberId, __) -> {
+        members.keySet().forEach(memberId -> {
             String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(),
 removedMemberId));
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 removedMemberId));
         });
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId));
 
-        members().forEach((memberId,  __) -> {
+        members.keySet().forEach(memberId -> {
             String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
-            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(),
 removedMemberId));
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 removedMemberId));
         });
 
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
-        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
+        resolvedRegularExpressions.keySet().forEach(regex ->
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 regex))
+        );
+
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId));
     }
 
     @Override
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 12cd0c1ab9f..15d3cd03f05 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -100,6 +101,38 @@ public class Assertions {
         }
     }
 
+    public static void assertUnorderedRecordsEquals(
+        List<List<CoordinatorRecord>> expectedRecords,
+        List<CoordinatorRecord> actualRecords
+    ) {
+        try {
+            int i = 0, j = 0;
+            while (i < expectedRecords.size()) {
+                List<CoordinatorRecord> slice = expectedRecords.get(i);
+                assertRecordsEquals(
+                    slice
+                        .stream()
+                        .sorted(Comparator.comparing(Object::toString))
+                        .collect(Collectors.toList()),
+                    actualRecords
+                        .subList(j, j + slice.size())
+                        .stream()
+                        .sorted(Comparator.comparing(Object::toString))
+                        .collect(Collectors.toList())
+                );
+
+                j += slice.size();
+                i++;
+            }
+            assertEquals(j, actualRecords.size());
+        } catch (AssertionFailedError e) {
+            assertionFailure()
+                .expected(expectedRecords)
+                .actual(actualRecords)
+                .buildAndThrow();
+        }
+    }
+
     public static void assertRecordEquals(
         CoordinatorRecord expected,
         CoordinatorRecord actual
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 98fa8452c88..d9609e0f7a5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -15683,6 +15683,262 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords.subList(2, 4), 
task.result.records().subList(2, 4));
     }
 
+    @Test
+    public void 
testResolvedRegularExpressionsRemovedWhenMembersLeaveOrFenced() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build(1L))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("bar*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(barTopicId, 0, 1, 2)))
+                    .build())
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L))
+                .withResolvedRegularExpression("bar*", new 
ResolvedRegularExpression(
+                    Set.of(barTopicName), 0L, 0L))
+                .withSubscriptionMetadata(Map.of(
+                    fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6),
+                    barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)))
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(barTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Setup the timers.
+        context.onLoaded();
+
+        // Member 1 leaves the group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(-1));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(-1),
+            result.response()
+        );
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*"),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
+                Map.of(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3))
+            ),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Member 2 is fenced due to reaching the session timeout.
+        context.assertSessionTimeout(groupId, memberId2, 45000);
+        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+
+        // Verify the expired timeout.
+        assertEquals(
+            Collections.singletonList(new ExpiredTimeout<Void, 
CoordinatorRecord>(
+                groupSessionTimeoutKey(groupId, memberId2),
+                new CoordinatorResult<>(
+                    List.of(
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId2),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId2),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId2),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "bar*"),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Collections.emptyMap()),
+                        
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12)
+                    )
+                )
+            )),
+            timeouts
+        );
+    }
+
+    @Test
+    public void 
testResolvedRegularExpressionsRemovedWhenConsumerMembersRemovedByAdminApi() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+        String memberId4 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build(1L))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setInstanceId(memberId1)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setInstanceId(memberId2)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId3)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setInstanceId(memberId3)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("bar*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId4)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setInstanceId(memberId4)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("bar*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L))
+                .withResolvedRegularExpression("bar*", new 
ResolvedRegularExpression(
+                    Set.of(barTopicName), 0L, 0L))
+                .withSubscriptionMetadata(Map.of(
+                    fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6),
+                    barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
+                ))
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(barTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Remove members.
+        CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> result = 
context.sendClassicGroupLeave(
+            new LeaveGroupRequestData()
+                .setGroupId(groupId)
+                .setMembers(List.of(
+                    new MemberIdentity().setGroupInstanceId(memberId1),
+                    new MemberIdentity().setGroupInstanceId(memberId2),
+                    new MemberIdentity().setGroupInstanceId(memberId3)
+                ))
+        );
+
+        assertEquals(
+            new LeaveGroupResponseData()
+                .setMembers(List.of(
+                    new LeaveGroupResponseData.MemberResponse()
+                        .setMemberId(memberId1)
+                        .setGroupInstanceId(memberId1),
+                    new LeaveGroupResponseData.MemberResponse()
+                        .setMemberId(memberId2)
+                        .setGroupInstanceId(memberId2),
+                    new LeaveGroupResponseData.MemberResponse()
+                        .setMemberId(memberId3)
+                        .setGroupInstanceId(memberId3)
+                )),
+            result.response()
+        );
+
+        assertRecordsEquals(
+            List.of(
+                // Remove member 1.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
+                // Remove member 2.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId2),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId2),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId2),
+                // Remove member 3.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId3),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId3),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId3),
+                // Remove regex.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*"),
+                // Updated subscription metadata.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
+                    barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
+                )),
+                // Bumped epoch.
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
+            ),
+            result.records()
+        );
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 13ced4b6dab..1caa54325e4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -1577,6 +1577,10 @@ public class GroupMetadataManagerTestContext {
         snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
     }
 
+    void onLoaded() {
+        groupMetadataManager.onLoaded();
+    }
+
     void onUnloaded() {
         groupMetadataManager.onUnloaded();
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 9c829630f09..eacb4fca4d5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -66,9 +66,7 @@ import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
-import static 
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
-import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
+import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
@@ -1457,52 +1455,6 @@ public class ConsumerGroupTest {
         assertEquals(1, consumerGroup.numClassicProtocolMembers());
     }
 
-    @Test
-    public void testCreateGroupTombstoneRecordsWithReplacedMember() {
-        String groupId = "group";
-        String memberId1 = "member-1";
-        String memberId2 = "member-2";
-        String newMemberId2 = "new-member-2";
-
-        ConsumerGroup consumerGroup = createConsumerGroup(groupId);
-        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new 
ArrayList<>();
-        protocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
-            .setName("range")
-            .setMetadata(new byte[0]));
-
-        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
-            .setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
-                .setSupportedProtocols(protocols))
-            .build();
-        consumerGroup.updateMember(member1);
-
-        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
-            .setInstanceId("instance-id-2")
-            .build();
-        consumerGroup.updateMember(member2);
-
-        List<CoordinatorRecord> records = new ArrayList<>();
-        consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records, 
memberId2, newMemberId2);
-
-        List<CoordinatorRecord> expectedRecords = Arrays.asList(
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 newMemberId2),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 newMemberId2),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 newMemberId2),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
-        );
-        assertEquals(expectedRecords.size(), records.size());
-        assertUnorderedListEquals(expectedRecords.subList(0, 2), 
records.subList(0, 2));
-        assertUnorderedListEquals(expectedRecords.subList(2, 4), 
records.subList(2, 4));
-        assertRecordEquals(expectedRecords.get(4), records.get(4));
-        assertUnorderedListEquals(expectedRecords.subList(5, 7), 
records.subList(5, 7));
-        assertRecordsEquals(expectedRecords.subList(7, 9), records.subList(7, 
9));
-    }
-
     @Test
     public void testFromClassicGroup() {
         MockTime time = new MockTime();
@@ -1799,4 +1751,350 @@ public class ConsumerGroupTest {
             consumerGroup.subscribedTopicNames()
         );
     }
+
+    @Test
+    public void testComputeSubscribedTopicNamesWithoutDeletedMembers() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+            .build();
+        consumerGroup.updateMember(member1);
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member2")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .build();
+        consumerGroup.updateMember(member2);
+
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder("member3")
+            .setSubscribedTopicRegex("foo*")
+            .build();
+        consumerGroup.updateMember(member3);
+
+        ConsumerGroupMember member4 = new 
ConsumerGroupMember.Builder("member4")
+            .setSubscribedTopicRegex("foo*")
+            .build();
+        consumerGroup.updateMember(member4);
+
+        ConsumerGroupMember member5 = new 
ConsumerGroupMember.Builder("member5")
+            .setSubscribedTopicRegex("bar*")
+            .build();
+        consumerGroup.updateMember(member5);
+
+        ConsumerGroupMember member6 = new 
ConsumerGroupMember.Builder("member6")
+            .setSubscribedTopicRegex("bar*")
+            .build();
+        consumerGroup.updateMember(member6);
+
+        consumerGroup.updateResolvedRegularExpression(
+            "foo*",
+            new ResolvedRegularExpression(
+                Set.of("foo", "fooo"),
+                10L,
+                12345L
+            )
+        );
+
+        consumerGroup.updateResolvedRegularExpression(
+            "bar*",
+            new ResolvedRegularExpression(
+                Set.of("bar", "barr"),
+                10L,
+                12345L
+            )
+        );
+
+        // Verify initial state.
+        assertEquals(
+            Map.of(
+                "foo", 3,
+                "fooo", 1,
+                "bar", 3,
+                "barr", 1,
+                "zar", 1
+            ),
+            consumerGroup.subscribedTopicNames()
+        );
+
+        // Compute with removed members and regexes.
+        assertEquals(
+            Map.of(
+                "foo", 1,
+                "bar", 2,
+                "barr", 1,
+                "zar", 1
+            ),
+            consumerGroup.computeSubscribedTopicNamesWithoutDeletedMembers(
+                Set.of(member2, member3, member4, member5),
+                Set.of("foo*")
+            )
+        );
+    }
+
+    @Test
+    public void testComputeSubscribedTopicNames() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setSubscribedTopicNames(List.of("foo", "bar", "zar"))
+            .build();
+        consumerGroup.updateMember(member1);
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member2")
+            .setSubscribedTopicNames(List.of("foo", "bar"))
+            .build();
+        consumerGroup.updateMember(member2);
+
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder("member3")
+            .setSubscribedTopicNames(List.of("foo"))
+            .setSubscribedTopicRegex("foo*")
+            .build();
+        consumerGroup.updateMember(member3);
+
+        consumerGroup.updateResolvedRegularExpression(
+            "foo*",
+            new ResolvedRegularExpression(
+                Set.of("foo", "fooo"),
+                10L,
+                12345L
+            )
+        );
+
+        // Verify initial state.
+        assertEquals(
+            Map.of(
+                "foo", 4,
+                "fooo", 1,
+                "bar", 2,
+                "zar", 1
+            ),
+            consumerGroup.subscribedTopicNames()
+        );
+
+        // Compute subscribed topic names without changing the regex.
+        assertEquals(
+            Map.of(
+                "foo", 4,
+                "fooo", 1,
+                "bar", 2,
+                "zar", 1
+            ),
+            consumerGroup.computeSubscribedTopicNames(member3, member3)
+        );
+
+        // Compute subscribed topic names with removing the regex.
+        assertEquals(
+            Map.of(
+                "foo", 3,
+                "bar", 2,
+                "zar", 1
+            ),
+            consumerGroup.computeSubscribedTopicNames(
+                member3,
+                new ConsumerGroupMember.Builder(member3)
+                    .setSubscribedTopicRegex("")
+                    .build()
+            )
+        );
+
+        // Compute subscribed topic names with removing the names.
+        assertEquals(
+            Map.of(
+                "foo", 3,
+                "fooo", 1,
+                "bar", 2,
+                "zar", 1
+            ),
+            consumerGroup.computeSubscribedTopicNames(
+                member3,
+                new ConsumerGroupMember.Builder(member3)
+                    .setSubscribedTopicNames(Collections.emptyList())
+                    .build()
+            )
+        );
+
+        // Compute subscribed topic names with removing both.
+        assertEquals(
+            Map.of(
+                "foo", 2,
+                "bar", 2,
+                "zar", 1
+            ),
+            consumerGroup.computeSubscribedTopicNames(
+                member3,
+                new ConsumerGroupMember.Builder(member3)
+                    .setSubscribedTopicNames(Collections.emptyList())
+                    .setSubscribedTopicRegex("")
+                    .build()
+            )
+        );
+    }
+
+    @Test
+    public void testCreateGroupTombstoneRecords() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        consumerGroup.setGroupEpoch(10);
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setMemberEpoch(10)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+            
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1, 
2)))
+            .build();
+        consumerGroup.updateMember(member1);
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member2")
+            .setMemberEpoch(10)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1, 
2)))
+            .build();
+        consumerGroup.updateMember(member2);
+
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder("member3")
+            .setMemberEpoch(10)
+            .setSubscribedTopicRegex("foo*")
+            
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1, 
2)))
+            .build();
+        consumerGroup.updateMember(member3);
+
+        consumerGroup.updateResolvedRegularExpression(
+            "foo*",
+            new ResolvedRegularExpression(
+                Set.of("foo", "fooo"),
+                10L,
+                12345L
+            )
+        );
+
+        consumerGroup.updateTargetAssignment("member1", new 
Assignment(mkAssignment(
+            mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+        ));
+
+        consumerGroup.updateTargetAssignment("member2", new 
Assignment(mkAssignment(
+            mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+        ));
+
+        consumerGroup.updateTargetAssignment("member3", new 
Assignment(mkAssignment(
+            mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+        ));
+
+        List<CoordinatorRecord> records = new ArrayList<>();
+        consumerGroup.createGroupTombstoneRecords(records);
+
+        assertUnorderedRecordsEquals(
+            List.of(
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
 "member1"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
 "member2"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
 "member3")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
 "member1"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
 "member2"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
 "member3")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "member1"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "member2"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "member3")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone("foo", 
"foo*")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("foo")
+                )
+            ),
+            records
+        );
+    }
+
+    @Test
+    public void testCreateGroupTombstoneRecordsWithReplacedMember() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        consumerGroup.setGroupEpoch(10);
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member1")
+            .setMemberEpoch(10)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+            
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1, 
2)))
+            .build();
+        consumerGroup.updateMember(member1);
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member2")
+            .setMemberEpoch(10)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1, 
2)))
+            .build();
+        consumerGroup.updateMember(member2);
+
+        ConsumerGroupMember member3 = new 
ConsumerGroupMember.Builder("member3")
+            .setMemberEpoch(10)
+            .setSubscribedTopicRegex("foo*")
+            
.setAssignedPartitions(mkAssignment(mkTopicAssignment(Uuid.randomUuid(), 0, 1, 
2)))
+            .build();
+        consumerGroup.updateMember(member3);
+
+        consumerGroup.updateResolvedRegularExpression(
+            "foo*",
+            new ResolvedRegularExpression(
+                Set.of("foo", "fooo"),
+                10L,
+                12345L
+            )
+        );
+
+        consumerGroup.updateTargetAssignment("member1", new 
Assignment(mkAssignment(
+            mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+        ));
+
+        consumerGroup.updateTargetAssignment("member2", new 
Assignment(mkAssignment(
+            mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+        ));
+
+        consumerGroup.updateTargetAssignment("member3", new 
Assignment(mkAssignment(
+            mkTopicAssignment(Uuid.randomUuid(), 0, 1, 2))
+        ));
+
+        List<CoordinatorRecord> records = new ArrayList<>();
+        consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records, 
"member3", "member4");
+
+        assertUnorderedRecordsEquals(
+            List.of(
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
 "member1"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
 "member2"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord("foo",
 "member4")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
 "member1"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
 "member2"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord("foo",
 "member4")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord("foo")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "member1"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "member2"),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
 "member4")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone("foo", 
"foo*")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo")
+                ),
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord("foo")
+                )
+            ),
+            records
+        );
+    }
 }

Reply via email to