This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f4dbf7fd76 KAFKA-19720: Regex subscription should be empty for 
classic members joining mixed group (#20626)
0f4dbf7fd76 is described below

commit 0f4dbf7fd76d0fb29fec1e3588399d67628d278f
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Sat Nov 15 06:02:58 2025 -0500

    KAFKA-19720: Regex subscription should be empty for classic members joining 
mixed group (#20626)
    
    We don't recompute the assignment on consumer -> classic member
    replacement when the consumer member had a regex subscription and the
    classic member does not.
    
    This PR sets regex subscription to empty during the replacement.
    
    Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  15 +-
 .../group/GroupMetadataManagerTest.java            | 297 +++++++++++++++++++++
 2 files changed, 311 insertions(+), 1 deletion(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f31d2c9a06d..946fe7f0f24 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2543,6 +2543,7 @@ public class GroupMetadataManager {
             
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
             .maybeUpdateServerAssignorName(Optional.empty())
             
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+            .setSubscribedTopicRegex("") // Regex subscription is not 
supported for classic member.
             .setClientId(context.clientId())
             .setClientHost(context.clientAddress().toString())
             .setClassicMemberMetadata(
@@ -2551,13 +2552,25 @@ public class GroupMetadataManager {
                     
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols)))
             .build();
 
-        boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+        boolean hasMemberSubscriptionChanged = hasMemberSubscriptionChanged(
             groupId,
             member,
             updatedMember,
             records
         );
 
+        // Maybe create tombstone for the regex if the joining member replaces 
a static member
+        // with regex subscription.
+        UpdateRegularExpressionsResult updateRegularExpressionsResult = 
maybeUpdateRegularExpressions(
+            context,
+            group,
+            member,
+            updatedMember,
+            records
+        );
+
+        boolean bumpGroupEpoch = hasMemberSubscriptionChanged || 
updateRegularExpressionsResult.regexUpdated();
+
         if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
             // The subscription metadata is updated in two cases:
             // 1) The member has updated its subscriptions;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6f0a4048de1..ef90ba0b631 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21486,6 +21486,303 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void 
testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
+        String groupId = "fooup";
+        String instanceId = "instance-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 1)
+            .buildCoordinatorMetadataImage(12345L);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+
+        // Member 1 is a static member with both regex and topic name 
subscription
+        // Member 2 uses topic name subscription.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(instanceId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("bar*")
+                    .setSubscribedTopicNames(List.of(fooTopicName))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(List.of(fooTopicName))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10)
+                .withResolvedRegularExpression("bar*", new 
ResolvedRegularExpression(
+                    Set.of(barTopicName), 0L, 0L)))
+            .build();
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        group.setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
+
+        // Member 1 is replaced by a classic member with the same instance id.
+        JoinGroupRequestProtocolCollection joinProtocols = new 
JoinGroupRequestProtocolCollection();
+        joinProtocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                List.of(fooTopicName)
+            ))))
+        );
+        JoinGroupRequestData joinRequest = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withRebalanceTimeoutMs(5000)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withProtocols(joinProtocols)
+            .build();
+        GroupMetadataManagerTestContext.JoinResult result = 
context.sendClassicGroupJoin(joinRequest);
+
+        ConsumerGroupMember newMember1 = group.staticMember(instanceId);
+
+        ConsumerGroupMember expectedCopiedMember = new 
ConsumerGroupMember.Builder(newMember1.memberId())
+            .setState(MemberState.STABLE)
+            .setInstanceId(instanceId)
+            .setMemberEpoch(0)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("bar*") // Still uses regex subscription.
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0)))
+            .build();
+
+        ConsumerGroupMember expectedMember1 = new 
ConsumerGroupMember.Builder(newMember1.memberId())
+            .setState(MemberState.STABLE)
+            .setInstanceId(instanceId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("") // empty regex subscription
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(Map.of()) // empty assignment
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(500)
+                    
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(joinRequest.protocols()))
+            )
+            .build();
+
+        List<List<CoordinatorRecord>> expectedRecords = List.of(
+            // The previous member is deleted.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1)),
+            // The previous member is replaced by the new one.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedCopiedMember)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 expectedCopiedMember.memberId(), expectedCopiedMember.assignedPartitions())),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedCopiedMember)),
+            // The member subscription is updated.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedMember1)),
+            // The regex is tombstoned.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "bar*")),
+            // The group epoch is bumped.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            )))),
+            // The target assignment is updated.
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
expectedMember1.memberId(), Map.of()),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, Map.of())
+            ),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+            // The member assignment is updated.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember1))
+        );
+
+        assertUnorderedRecordsEquals(
+            expectedRecords,
+            result.records
+        );
+    }
+
+    @Test
+    public void 
testConsumerMemberWithRegexReplacedByClassicMemberWithChangedSubscription() {
+        String groupId = "fooup";
+        String instanceId = "instance-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage(12345L);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+
+        // Member 1 is a static member with regex subscription and
+        // Member 2 uses topic name subscription.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(instanceId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicRegex("foo*")
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(List.of(fooTopicName))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10)
+                .withResolvedRegularExpression("foo*", new 
ResolvedRegularExpression(
+                    Set.of(fooTopicName), 0L, 0L)))
+            .build();
+        ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
+        group.setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
+
+        // Member 1 is replaced by a classic member with the same instance id.
+        JoinGroupRequestProtocolCollection joinProtocols = new 
JoinGroupRequestProtocolCollection();
+        joinProtocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                List.of()
+            ))))
+        );
+        JoinGroupRequestData joinRequest = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withRebalanceTimeoutMs(5000)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withProtocols(joinProtocols)
+            .build();
+        GroupMetadataManagerTestContext.JoinResult result = 
context.sendClassicGroupJoin(joinRequest);
+
+        ConsumerGroupMember newMember1 = group.staticMember(instanceId);
+
+        ConsumerGroupMember expectedCopiedMember = new 
ConsumerGroupMember.Builder(newMember1.memberId())
+            .setState(MemberState.STABLE)
+            .setInstanceId(instanceId)
+            .setMemberEpoch(0)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("foo*") // Still uses regex subscription.
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2)))
+            .build();
+
+        ConsumerGroupMember expectedMember1 = new 
ConsumerGroupMember.Builder(newMember1.memberId())
+            .setState(MemberState.STABLE)
+            .setInstanceId(instanceId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicRegex("") // empty regex subscription
+            .setServerAssignorName("range")
+            .setAssignedPartitions(Map.of()) // empty assignment
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(500)
+                    
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(joinRequest.protocols()))
+            )
+            .build();
+
+        List<List<CoordinatorRecord>> expectedRecords = List.of(
+            // The previous member is deleted.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1)),
+            // The previous member is replaced by the new one.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedCopiedMember)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 expectedCopiedMember.memberId(), expectedCopiedMember.assignedPartitions())),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedCopiedMember)),
+            // The member subscription is updated.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedMember1)),
+            // The regex is tombstoned.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*")),
+            // The group epoch is bumped.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+            )))),
+            // The target assignment is updated.
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
expectedMember1.memberId(), Map.of()),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, Map.of())
+            ),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 11)),
+            // The member assignment is updated.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember1))
+        );
+
+        assertUnorderedRecordsEquals(
+            expectedRecords,
+            result.records
+        );
+    }
+
     @Test
     public void 
testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
         String groupId = "fooup";

Reply via email to