This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbea4f69bd8 KAFKA-19546: Rebalance should be triggered by subscription 
change during group protocol downgrade (#20417)
cbea4f69bd8 is described below

commit cbea4f69bd82cbe6dcdcf643198e0ec265ae7d74
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Sep 24 09:40:45 2025 -0400

    KAFKA-19546: Rebalance should be triggered by subscription change during 
group protocol downgrade (#20417)
    
    During online downgrade, when a static member using the consumer
    protocol which is also the last member using the consumer protocol is
    replaced by another static member using the classic protocol with the
    same instance id, the latter will take the assignment of the former and
    an online downgrade will be triggered.
    
    In the current implementation, if the replacing static member has a
    different subscription, no rebalance will be triggered when the
    downgrade happens. The patch checks whether the static member has
    changed subscription and triggers a rebalance when it does.
    
    Reviewers: Sean Quah <[email protected]>, David Jacot
     <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 104 ++++++++++-------
 .../group/GroupMetadataManagerTest.java            | 129 +++++++++++++--------
 2 files changed, 144 insertions(+), 89 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4f156d75daa..cea68e09ffb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1257,16 +1257,19 @@ public class GroupMetadataManager {
     /**
      * Creates a ClassicGroup corresponding to the given ConsumerGroup.
      *
-     * @param consumerGroup     The converted ConsumerGroup.
-     * @param leavingMembers    The leaving member(s) that triggered the 
downgrade validation.
-     * @param joiningMember     The newly joined member if the downgrade is 
triggered by static member replacement.
-     *                          When not null, must have an instanceId that 
matches an existing member.
-     * @param records           The record list to which the conversion 
records are added.
+     * @param consumerGroup             The converted ConsumerGroup.
+     * @param leavingMembers            The leaving member(s) that triggered 
the downgrade validation.
+     * @param joiningMember             The newly joined member if the 
downgrade is triggered by static member replacement.
+     *                                  When not null, must have an instanceId 
that matches the replaced member.
+     * @param hasSubscriptionChanged    The boolean indicating whether the 
joining member has a different subscription
+     *                                  from the replaced member. Only used 
when joiningMember is set.
+     * @param records                   The record list to which the 
conversion records are added.
      */
     private void convertToClassicGroup(
         ConsumerGroup consumerGroup,
         Set<ConsumerGroupMember> leavingMembers,
         ConsumerGroupMember joiningMember,
+        boolean hasSubscriptionChanged,
         List<CoordinatorRecord> records
     ) {
         if (joiningMember == null) {
@@ -1307,9 +1310,12 @@ public class GroupMetadataManager {
 
         classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
 
-        // If the downgrade is triggered by a member leaving the group, a 
rebalance should be triggered.
+        // If the downgrade is triggered by a member leaving the group or a 
static
+        // member replacement with a different subscription, a rebalance 
should be triggered.
         if (joiningMember == null) {
-            prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic.", classicGroup.groupId()));
+            prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic for member leaving.", classicGroup.groupId()));
+        } else if (hasSubscriptionChanged) {
+            prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic for static member replacement with different 
subscription.", classicGroup.groupId()));
         }
 
         log.info("[GroupId {}] Converted the consumer group to a classic 
group.", consumerGroup.groupId());
@@ -2401,6 +2407,10 @@ public class GroupMetadataManager {
             );
         }
 
+        ConsumerGroupMember existingStaticMemberOrNull = 
group.staticMember(request.groupInstanceId());
+        boolean downgrade = existingStaticMemberOrNull != null &&
+            validateOnlineDowngradeWithReplacedMember(group, 
existingStaticMemberOrNull);
+
         int groupEpoch = group.groupEpoch();
         SubscriptionType subscriptionType = group.subscriptionType();
         final ConsumerProtocolSubscription subscription = 
deserializeSubscription(protocols);
@@ -2447,49 +2457,61 @@ public class GroupMetadataManager {
             subscriptionType = result.subscriptionType;
         }
 
-        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The delta between
-        // the existing and the new target assignment is persisted to the 
partition.
-        final int targetAssignmentEpoch;
-        final Assignment targetAssignment;
+        if (downgrade) {
+            // 2. If the static member subscription hasn't changed, reconcile 
the member's assignment with the existing
+            // assignment if the member is not fully reconciled yet. If the 
static member subscription has changed, a
+            // rebalance will be triggered during downgrade anyway so we can 
skip the reconciliation.
+            if (!bumpGroupEpoch) {
+                updatedMember = maybeReconcile(
+                    groupId,
+                    updatedMember,
+                    group::currentPartitionEpoch,
+                    group.assignmentEpoch(),
+                    group.targetAssignment(updatedMember.memberId(), 
updatedMember.instanceId()),
+                    toTopicPartitions(subscription.ownedPartitions(), 
metadataImage),
+                    records
+                );
+            }
 
-        if (groupEpoch > group.assignmentEpoch()) {
-            targetAssignment = updateTargetAssignment(
+            // 3. Downgrade the consumer group.
+            convertToClassicGroup(
                 group,
-                groupEpoch,
-                member,
+                Set.of(),
                 updatedMember,
-                subscriptionType,
+                bumpGroupEpoch,
                 records
             );
-            targetAssignmentEpoch = groupEpoch;
         } else {
-            targetAssignmentEpoch = group.assignmentEpoch();
-            targetAssignment = 
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
+            // If no downgrade is triggered.
 
-        }
+            // 2. Update the target assignment if the group epoch is larger 
than the target assignment epoch.
+            // The delta between the existing and the new target assignment is 
persisted to the partition.
+            final int targetAssignmentEpoch;
+            final Assignment targetAssignment;
 
-        // 3. Reconcile the member's assignment with the target assignment if 
the member is not
-        // fully reconciled yet.
-        updatedMember = maybeReconcile(
-            groupId,
-            updatedMember,
-            group::currentPartitionEpoch,
-            targetAssignmentEpoch,
-            targetAssignment,
-            toTopicPartitions(subscription.ownedPartitions(), metadataImage),
-            records
-        );
+            if (groupEpoch > group.assignmentEpoch()) {
+                targetAssignment = updateTargetAssignment(
+                    group,
+                    groupEpoch,
+                    member,
+                    updatedMember,
+                    subscriptionType,
+                    records
+                );
+                targetAssignmentEpoch = groupEpoch;
+            } else {
+                targetAssignmentEpoch = group.assignmentEpoch();
+                targetAssignment = 
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
+            }
 
-        // 4. Maybe downgrade the consumer group if the last static member 
using the
-        // consumer protocol is replaced by the joining static member.
-        ConsumerGroupMember existingStaticMemberOrNull = 
group.staticMember(request.groupInstanceId());
-        boolean downgrade = existingStaticMemberOrNull != null &&
-            validateOnlineDowngradeWithReplacedMember(group, 
existingStaticMemberOrNull);
-        if (downgrade) {
-            convertToClassicGroup(
-                group,
-                Set.of(),
+            // 3. Reconcile the member's assignment with the target assignment 
if the member is not fully reconciled yet.
+            updatedMember = maybeReconcile(
+                groupId,
                 updatedMember,
+                group::currentPartitionEpoch,
+                targetAssignmentEpoch,
+                targetAssignment,
+                toTopicPartitions(subscription.ownedPartitions(), 
metadataImage),
                 records
             );
         }
@@ -4084,7 +4106,7 @@ public class GroupMetadataManager {
 
         List<CoordinatorRecord> records = new ArrayList<>();
         if (validateOnlineDowngradeWithFencedMembers(group, members)) {
-            convertToClassicGroup(group, members, null, records);
+            convertToClassicGroup(group, members, null, false, records);
             return new CoordinatorResult<>(records, response, null, false);
         } else {
             for (ConsumerGroupMember member : members) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index efe2ad96435..8d4ae4fbe07 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -12457,8 +12457,11 @@ public class GroupMetadataManagerTest {
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
     }
 
-    @Test
-    public void 
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws 
ExecutionException, InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void 
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember(
+        boolean isSubscriptionChanged
+    ) throws ExecutionException, InterruptedException {
         String groupId = "group-id";
         String memberId1 = Uuid.randomUuid().toString();
         String oldMemberId2 = Uuid.randomUuid().toString();
@@ -12469,11 +12472,9 @@ public class GroupMetadataManagerTest {
         Uuid barTopicId = Uuid.randomUuid();
         String barTopicName = "bar";
 
-        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
-
         List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = 
List.of(
             new ConsumerGroupMemberMetadataValue.ClassicProtocol()
-                .setName("range")
+                .setName(NoOpPartitionAssignor.NAME)
                 
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
                     List.of(fooTopicName, barTopicName),
                     null,
@@ -12493,8 +12494,8 @@ public class GroupMetadataManagerTest {
             .setPreviousMemberEpoch(9)
             .setClientId(DEFAULT_CLIENT_ID)
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
-            .setSubscribedTopicNames(List.of("foo", "bar"))
-            .setServerAssignorName("range")
+            .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
             .setRebalanceTimeoutMs(45000)
             .setClassicMemberMetadata(
                 new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
@@ -12512,8 +12513,8 @@ public class GroupMetadataManagerTest {
             .setPreviousMemberEpoch(9)
             .setClientId(DEFAULT_CLIENT_ID)
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
-            .setSubscribedTopicNames(List.of("foo"))
-            .setServerAssignorName("range")
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
             .setRebalanceTimeoutMs(45000)
             .setAssignedPartitions(mkAssignment(
                 mkTopicAssignment(fooTopicId, 3, 4, 5)))
@@ -12524,12 +12525,14 @@ public class GroupMetadataManagerTest {
             .addTopic(barTopicId, barTopicName, 2)
             .addRacks()
             .buildCoordinatorMetadataImage();
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+        long barTopicHash = computeTopicHash(barTopicName, metadataImage);
 
         // Consumer group with two members.
         // Member 1 uses the classic protocol and static member 2 uses the 
consumer protocol.
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
-            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
NoOpPartitionAssignor()))
             .withMetadataImage(metadataImage)
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withMember(member1)
@@ -12549,12 +12552,19 @@ public class GroupMetadataManagerTest {
         
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
 
         // A new member using classic protocol with the same instance id 
joins, scheduling the downgrade.
+        byte[] protocolsMetadata2 = 
Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+            isSubscriptionChanged ? List.of(fooTopicName, barTopicName) : 
List.of(fooTopicName))));
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
+            new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols2.add(new JoinGroupRequestProtocol()
+            .setName(NoOpPartitionAssignor.NAME)
+            .setMetadata(protocolsMetadata2));
         JoinGroupRequestData joinRequest = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
             .withGroupId(groupId)
             .withMemberId(UNKNOWN_MEMBER_ID)
             .withGroupInstanceId(instanceId)
             .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
-            .withDefaultProtocolTypeAndProtocols()
+            .withProtocols(protocols2)
             .build();
         GroupMetadataManagerTestContext.JoinResult result = 
context.sendClassicGroupJoin(joinRequest);
         result.appendFuture.complete(null);
@@ -12566,14 +12576,15 @@ public class GroupMetadataManagerTest {
             .build();
         ConsumerGroupMember expectedNewClassicMember2 = new 
ConsumerGroupMember.Builder(oldMember2, newMemberId2)
             .setPreviousMemberEpoch(0)
+            .setMemberEpoch(isSubscriptionChanged ? 11 : 10)
+            .setSubscribedTopicNames(isSubscriptionChanged ? 
List.of(fooTopicName, barTopicName) : List.of(fooTopicName))
             .setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
             .setClassicMemberMetadata(
                 new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
                     .setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
                     .setSupportedProtocols(List.of(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
-                        .setName("range")
-                        
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
-                            List.of(fooTopicName)))))))
+                        .setName(NoOpPartitionAssignor.NAME)
+                        .setMetadata(protocolsMetadata2)))
             ).build();
 
         byte[] assignment1 = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(List.of(
@@ -12600,7 +12611,7 @@ public class GroupMetadataManagerTest {
             context.time,
             10,
             Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
-            Optional.of("range"),
+            Optional.of(NoOpPartitionAssignor.NAME),
             Optional.of(memberId1),
             Optional.of(context.time.milliseconds())
         );
@@ -12636,42 +12647,60 @@ public class GroupMetadataManagerTest {
         assertTrue(Set.of(memberId1, newMemberId2).contains(leader));
         expectedClassicGroup.setLeaderId(Optional.of(leader));
 
-        assertUnorderedRecordsEquals(
-            List.of(
-                // Remove the existing member 2 that uses the consumer 
protocol.
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 oldMemberId2)),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 oldMemberId2)),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 oldMemberId2)),
+        List<List<CoordinatorRecord>> replacingRecords = List.of(
+            // Remove the existing member 2 that uses the consumer protocol.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 oldMemberId2)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 oldMemberId2)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 oldMemberId2)),
 
-                // Create the new member 2 that uses the consumer protocol.
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedNewConsumerMember2)),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedNewConsumerMember2)),
+            // Create the new member 2 that uses the consumer protocol.
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedNewConsumerMember2)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 newMemberId2, expectedNewConsumerMember2.assignedPartitions())),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedNewConsumerMember2))
+        );
 
-                // Update the new member 2 to the member that uses classic 
protocol.
+        List<List<CoordinatorRecord>> memberUpdateRecords;
+        if (isSubscriptionChanged) {
+            memberUpdateRecords = List.of(
                 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedNewClassicMember2)),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedNewClassicMember2)),
-
-                // Remove member 1, member 2 and the consumer group.
-                List.of(
-                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
-                    
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 newMemberId2)
-                ),
-                List.of(
-                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
-                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 newMemberId2)
-                ),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
-                List.of(
-                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
-                    
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 newMemberId2)
-                ),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
-                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+                    fooTopicName, fooTopicHash,
+                    barTopicName, barTopicHash
+                ))))
+            );
+        } else {
+            memberUpdateRecords = List.of(
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedNewClassicMember2)),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedNewClassicMember2))
+            );
+        }
 
-                // Create the classic group.
-                
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
 assignments))
+        List<List<CoordinatorRecord>> downgradeRecords = List.of(
+            // Remove member 1, member 2 and the consumer group.
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 newMemberId2)
+            ),
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 newMemberId2)
             ),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId)),
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 newMemberId2)
+            ),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)),
+            
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)),
+
+            // Create the classic group.
+            
List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
 assignments))
+        );
+
+        assertUnorderedRecordsEquals(
+            Stream.of(replacingRecords, memberUpdateRecords, downgradeRecords)
+                .flatMap(List::stream)
+                .collect(Collectors.toList()),
             result.records
         );
 
@@ -12681,9 +12710,13 @@ public class GroupMetadataManagerTest {
         );
         assertNotNull(heartbeatTimeout);
 
-        // No rebalance is triggered.
+        // If the subscription is changed, a rebalance is triggered.
         ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
-        assertTrue(classicGroup.isInState(STABLE));
+        if (isSubscriptionChanged) {
+            assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+        } else {
+            assertTrue(classicGroup.isInState(STABLE));
+        }
     }
 
     @Test

Reply via email to