This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8ccb26de2ea KAFKA-17733: Protocol upgrade should allow empty member
assignment in group conversion (#17853)
8ccb26de2ea is described below
commit 8ccb26de2eab4f3695bcdd029c82e4a7c872c314
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Tue Nov 19 13:46:07 2024 -0500
KAFKA-17733: Protocol upgrade should allow empty member assignment in group
conversion (#17853)
During conversion from classic to consumer group, if a member has empty
assignment (e.g., the member just joined and has never synced), the conversion
will fail because of the buffer underflow error when deserializing the member
assignment. This patch allows empty assignment while deserializing the member
assignment.
Reviewers: Jeff Kim <[email protected]>, David Jacot
<[email protected]>
---
.../group/modern/consumer/ConsumerGroup.java | 24 +++-
.../group/GroupMetadataManagerTest.java | 121 +++++++++++++++++++++
2 files changed, 139 insertions(+), 6 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 654c49e3958..544ca7fa8d2 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -45,6 +45,7 @@ import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineObject;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -56,6 +57,7 @@ import java.util.Set;
import static org.apache.kafka.coordinator.group.Utils.toOptional;
import static org.apache.kafka.coordinator.group.Utils.toTopicPartitionMap;
+import static
org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.ASSIGNING;
import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.EMPTY;
import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState.RECONCILING;
@@ -939,12 +941,22 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
classicGroup.allMembers().forEach(classicGroupMember -> {
- Map<Uuid, Set<Integer>> assignedPartitions = toTopicPartitionMap(
- ConsumerProtocol.deserializeConsumerProtocolAssignment(
- ByteBuffer.wrap(classicGroupMember.assignment())
- ),
- topicsImage
- );
+ // The assigned partition can be empty if the member just joined
and has never synced.
+ // We should accept the empty assignment.
+ Map<Uuid, Set<Integer>> assignedPartitions;
+ if (Arrays.equals(classicGroupMember.assignment(),
EMPTY_ASSIGNMENT)) {
+ assignedPartitions = Collections.emptyMap();
+ } else {
+ assignedPartitions = toTopicPartitionMap(
+ ConsumerProtocol.deserializeConsumerProtocolAssignment(
+ ByteBuffer.wrap(classicGroupMember.assignment())
+ ),
+ topicsImage
+ );
+ }
+
+ // Every member is guaranteed to have metadata set when it joins,
+ // so we don't check for empty subscription here.
ConsumerProtocolSubscription subscription =
ConsumerProtocol.deserializeConsumerProtocolSubscription(
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b21ff1796fc..e4f6606bf4f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -10240,6 +10240,127 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, newMemberId, 45000);
}
+ @Test
+ public void
testConsumerGroupHeartbeatToClassicGroupWithEmptyAssignmentMember() throws
ExecutionException, InterruptedException {
+ String groupId = "group-id";
+ String memberId2 = "member-id-2";
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.UPGRADE)
+ .withConsumerGroupAssignors(Collections.singletonList(new
NoOpPartitionAssignor()))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName, barTopicName)
+ ))))
+ );
+
+ // Member 1 joins, creating a new classic group.
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(
+ new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocols(protocols)
+ .withSessionTimeoutMs(5000)
+ .withRebalanceTimeoutMs(10000)
+ .build()
+ );
+
+ // Triggering completion of the rebalance.
+ // Member 1 has never synced so its assignment is empty.
+ context.sleep(3000 + 1);
+ String memberId1 = joinResult.joinFuture.get().memberId();
+ ClassicGroup group =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+ // A new member 2 with new protocol joins the classic group,
triggering the upgrade.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor(NoOpPartitionAssignor.NAME)
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setTopicPartitions(Collections.emptyList()));
+
+ ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(1)
+ .setState(MemberState.STABLE)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setRebalanceTimeoutMs(10000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols))
+ )
+ .setAssignedPartitions(Collections.emptyMap())
+ .build();
+
+ ConsumerGroupMember expectedMember2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(0)
+ .setState(MemberState.STABLE)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setRebalanceTimeoutMs(5000)
+ .setAssignedPartitions(Collections.emptyMap())
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+ // The existing classic group tombstone.
+
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
+
+ // Create the new consumer group with member 1.
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, expectedMember1.assignedPartitions()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1),
+
+ // Member 2 joins the new consumer group.
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
+
+ // The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
Map.of(
+ fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
+ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
+ )),
+
+ // Newly joining member 2 bumps the group epoch. A new target
assignment is computed.
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, Collections.emptyMap()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
2),
+
+ // Member 2 has no pending revoking partition or pending release
partition.
+ // Bump its member epoch and transition to STABLE.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
+ context.assertSessionTimeout(groupId, memberId1,
expectedMember1.classicProtocolSessionTimeout().get());
+ context.assertSessionTimeout(groupId, memberId2, 45000);
+ }
+
@Test
public void testConsumerGroupHeartbeatFromExistingClassicStaticMember() {
String groupId = "group-id";