This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8ccb26de2ea KAFKA-17733: Protocol upgrade should allow empty member 
assignment in group conversion (#17853)
8ccb26de2ea is described below

commit 8ccb26de2eab4f3695bcdd029c82e4a7c872c314
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Tue Nov 19 13:46:07 2024 -0500

    KAFKA-17733: Protocol upgrade should allow empty member assignment in group 
conversion (#17853)
    
    During conversion from classic to consumer group, if a member has empty 
assignment (e.g., the member just joined and has never synced), the conversion 
will fail because of the buffer underflow error when deserializing the member 
assignment. This patch allows empty assignment while deserializing the member 
assignment.
    
    Reviewers: Jeff Kim <[email protected]>, David Jacot 
<[email protected]>
---
 .../group/modern/consumer/ConsumerGroup.java       |  24 +++-
 .../group/GroupMetadataManagerTest.java            | 121 +++++++++++++++++++++
 2 files changed, 139 insertions(+), 6 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 654c49e3958..544ca7fa8d2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -45,6 +45,7 @@ import org.apache.kafka.timeline.TimelineInteger;
 import org.apache.kafka.timeline.TimelineObject;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -56,6 +57,7 @@ import java.util.Set;
 
 import static org.apache.kafka.coordinator.group.Utils.toOptional;
 import static org.apache.kafka.coordinator.group.Utils.toTopicPartitionMap;
+import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.EMPTY;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING;
@@ -939,12 +941,22 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
 
         classicGroup.allMembers().forEach(classicGroupMember -> {
-            Map<Uuid, Set<Integer>> assignedPartitions = toTopicPartitionMap(
-                ConsumerProtocol.deserializeConsumerProtocolAssignment(
-                    ByteBuffer.wrap(classicGroupMember.assignment())
-                ),
-                topicsImage
-            );
+            // The assigned partition can be empty if the member just joined 
and has never synced.
+            // We should accept the empty assignment.
+            Map<Uuid, Set<Integer>> assignedPartitions;
+            if (Arrays.equals(classicGroupMember.assignment(), 
EMPTY_ASSIGNMENT)) {
+                assignedPartitions = Collections.emptyMap();
+            } else {
+                assignedPartitions = toTopicPartitionMap(
+                    ConsumerProtocol.deserializeConsumerProtocolAssignment(
+                        ByteBuffer.wrap(classicGroupMember.assignment())
+                    ),
+                    topicsImage
+                );
+            }
+
+            // Every member is guaranteed to have metadata set when it joins,
+            // so we don't check for empty subscription here.
             ConsumerProtocolSubscription subscription = 
ConsumerProtocol.deserializeConsumerProtocolSubscription(
                 
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
             );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b21ff1796fc..e4f6606bf4f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -10240,6 +10240,127 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, newMemberId, 45000);
     }
 
+    @Test
+    public void 
testConsumerGroupHeartbeatToClassicGroupWithEmptyAssignmentMember() throws 
ExecutionException, InterruptedException {
+        String groupId = "group-id";
+        String memberId2 = "member-id-2";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addTopic(barTopicId, barTopicName, 1)
+            .addRacks()
+            .build();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.UPGRADE)
+            .withConsumerGroupAssignors(Collections.singletonList(new 
NoOpPartitionAssignor()))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+            .setName("range")
+            
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                List.of(fooTopicName, barTopicName)
+            ))))
+        );
+
+        // Member 1 joins, creating a new classic group.
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(
+            new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+                .withGroupId(groupId)
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withProtocols(protocols)
+                .withSessionTimeoutMs(5000)
+                .withRebalanceTimeoutMs(10000)
+                .build()
+        );
+
+        // Triggering completion of the rebalance.
+        // Member 1 has never synced so its assignment is empty.
+        context.sleep(3000 + 1);
+        String memberId1 = joinResult.joinFuture.get().memberId();
+        ClassicGroup group = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        // A new member 2 with new protocol joins the classic group, 
triggering the upgrade.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor(NoOpPartitionAssignor.NAME)
+                .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+                .setTopicPartitions(Collections.emptyList()));
+
+        ConsumerGroupMember expectedMember1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(1)
+            .setState(MemberState.STABLE)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+            .setRebalanceTimeoutMs(10000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols))
+            )
+            .setAssignedPartitions(Collections.emptyMap())
+            .build();
+
+        ConsumerGroupMember expectedMember2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(0)
+            .setState(MemberState.STABLE)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
+            .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+            .setRebalanceTimeoutMs(5000)
+            .setAssignedPartitions(Collections.emptyMap())
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            // The existing classic group tombstone.
+            
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
+
+            // Create the new consumer group with member 1.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, expectedMember1.assignedPartitions()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1),
+
+            // Member 2 joins the new consumer group.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
+
+            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
+                fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
+                barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
+            )),
+
+            // Newly joining member 2 bumps the group epoch. A new target 
assignment is computed.
+            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, Collections.emptyMap()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 2),
+
+            // Member 2 has no pending revoking partition or pending release 
partition.
+            // Bump its member epoch and transition to STABLE.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        context.assertSessionTimeout(groupId, memberId1, 
expectedMember1.classicProtocolSessionTimeout().get());
+        context.assertSessionTimeout(groupId, memberId2, 45000);
+    }
+
     @Test
     public void testConsumerGroupHeartbeatFromExistingClassicStaticMember() {
         String groupId = "group-id";

Reply via email to