This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4ea29da3ca1 KAFKA-20273: Rebalance on classic group downgrade with 
stale assignment (#21663)
4ea29da3ca1 is described below

commit 4ea29da3ca1776f6870905eed015ef5751e4866c
Author: Sean Quah <[email protected]>
AuthorDate: Mon Mar 9 14:43:22 2026 +0000

    KAFKA-20273: Rebalance on classic group downgrade with stale assignment 
(#21663)
    
    When assignment batching or offload are enabled, the target assignment
    can lag behind member subscriptions. When downgrading a consumer group
    to a classic group, we must check if the target assignment is stale and
    enter the PREPARING_REBALANCE state if needed. Downgrades triggered by a
    member leaving the group already trigger a rebalance, so there is no
    change on that path.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  32 +++---
 .../group/GroupMetadataManagerTest.java            | 113 +++++++++++++++++++++
 2 files changed, 133 insertions(+), 12 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 925befaeef9..a03e8900555 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1263,15 +1263,16 @@ public class GroupMetadataManager {
      * @param leavingMembers            The leaving member(s) that triggered 
the downgrade validation.
      * @param joiningMember             The newly joined member if the 
downgrade is triggered by static member replacement.
      *                                  When not null, must have an instanceId 
that matches the replaced member.
-     * @param hasSubscriptionChanged    The boolean indicating whether the 
joining member has a different subscription
-     *                                  from the replaced member. Only used 
when joiningMember is set.
+     * @param rebalance                 Whether to trigger a rebalance after 
the downgrade.
+     * @param rebalanceReason           The reason for the rebalance, if 
{@code rebalance} is {@code true}.
      * @param records                   The record list to which the 
conversion records are added.
      */
     private void convertToClassicGroup(
         ConsumerGroup consumerGroup,
         Set<ConsumerGroupMember> leavingMembers,
         ConsumerGroupMember joiningMember,
-        boolean hasSubscriptionChanged,
+        boolean rebalance,
+        String rebalanceReason,
         List<CoordinatorRecord> records
     ) {
         if (joiningMember == null) {
@@ -1312,12 +1313,8 @@ public class GroupMetadataManager {
 
         classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
 
-        // If the downgrade is triggered by a member leaving the group or a 
static
-        // member replacement with a different subscription, a rebalance 
should be triggered.
-        if (joiningMember == null) {
-            prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic for member leaving.", classicGroup.groupId()));
-        } else if (hasSubscriptionChanged) {
-            prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic for static member replacement with different 
subscription.", classicGroup.groupId()));
+        if (rebalance) {
+            prepareRebalance(classicGroup, rebalanceReason);
         }
 
         log.info("[GroupId {}] Converted the consumer group to a classic 
group.", consumerGroup.groupId());
@@ -2585,7 +2582,8 @@ public class GroupMetadataManager {
             // 2. If the static member subscription hasn't changed, reconcile 
the member's assignment with the existing
             // assignment if the member is not fully reconciled yet. If the 
static member subscription has changed, a
             // rebalance will be triggered during downgrade anyway so we can 
skip the reconciliation.
-            if (!bumpGroupEpoch) {
+            boolean rebalance = group.assignmentEpoch() < groupEpoch;
+            if (!rebalance) {
                 updatedMember = maybeReconcile(
                     groupId,
                     updatedMember,
@@ -2604,7 +2602,10 @@ public class GroupMetadataManager {
                 group,
                 Set.of(),
                 updatedMember,
-                bumpGroupEpoch,
+                rebalance,
+                rebalance ?
+                    String.format("Downgrade group %s from consumer to classic 
with stale assignment.", group.groupId()) :
+                    null,
                 records
             );
         } else {
@@ -4209,7 +4210,14 @@ public class GroupMetadataManager {
 
         List<CoordinatorRecord> records = new ArrayList<>();
         if (validateOnlineDowngradeWithFencedMembers(group, members)) {
-            convertToClassicGroup(group, members, null, false, records);
+            convertToClassicGroup(
+                group,
+                members,
+                null,
+                true,
+                String.format("Downgrade group %s from consumer to classic for 
member leaving.", group.groupId()),
+                records
+            );
             return new CoordinatorResult<>(records, response, null, false);
         } else {
             for (ConsumerGroupMember member : members) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index d0a961d2f14..e4bf1c40a10 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -13567,6 +13567,119 @@ public class GroupMetadataManagerTest {
         }
     }
 
+    @Test
+    public void 
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMemberWhenTargetAssignmentIsStale()
 throws ExecutionException, InterruptedException {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String oldMemberId2 = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = 
List.of(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName(NoOpPartitionAssignor.NAME)
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    List.of(fooTopicName, barTopicName),
+                    null,
+                    List.of(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2),
+                        new TopicPartition(barTopicName, 0),
+                        new TopicPartition(barTopicName, 1)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols1)
+            )
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember oldMember2 = new 
ConsumerGroupMember.Builder(oldMemberId2)
+            .setInstanceId(instanceId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)))
+            .build();
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 2)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and static member 2 uses the 
consumer protocol.
+        // Member 2 has just changed subscription from foo to bar and the new 
assignment has not
+        // been computed yet.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
NoOpPartitionAssignor()))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+                .withMember(member1)
+                .withMember(oldMember2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(oldMemberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage),
+                    barTopicName, computeTopicHash(barTopicName, metadataImage)
+                ))))
+            .build();
+
+        // A new member using classic protocol with the same instance id 
joins, scheduling the downgrade.
+        byte[] protocolsMetadata2 = 
Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+            List.of(fooTopicName, barTopicName))));
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
+            new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols2.add(new JoinGroupRequestProtocol()
+            .setName(NoOpPartitionAssignor.NAME)
+            .setMetadata(protocolsMetadata2));
+        JoinGroupRequestData joinRequest = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withProtocols(protocols2)
+            .build();
+        GroupMetadataManagerTestContext.JoinResult result = 
context.sendClassicGroupJoin(joinRequest);
+        result.appendFuture.complete(null);
+        result.joinFuture.get();
+
+        // A rebalance is triggered.
+        ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+        assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+    }
+
     @Test
     public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
         String groupId = "group-id";

Reply via email to