This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 366aa1014c0 KAFKA-17317; Validate and maybe trigger downgrade after
static member replacement (#17306)
366aa1014c0 is described below
commit 366aa1014c01b4c14195d9930e423746f9edd999
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Mon Oct 7 05:11:16 2024 -0400
KAFKA-17317; Validate and maybe trigger downgrade after static member
replacement (#17306)
This implementation doesn't change the existing downgrade path.
In `classicGroupJoinToConsumerGroup`, if the group should be downgraded, it
will be converted to a classic group at the end of the method. The returned
records will be the records from GroupJoin plus the records from conversion. No
rebalance will be triggered in the newly converted group.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 97 +++++--
.../coordinator/group/classic/ClassicGroup.java | 35 ++-
.../group/modern/consumer/ConsumerGroup.java | 34 +++
.../group/GroupMetadataManagerTest.java | 244 ++++++++++++++++
.../group/classic/ClassicGroupTest.java | 321 +++++++++++++++++++++
.../group/modern/consumer/ConsumerGroupTest.java | 163 +++++++++++
6 files changed, 872 insertions(+), 22 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 39ac3c2e2f8..9d2f42d2beb 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1043,12 +1043,12 @@ public class GroupMetadataManager {
/**
* Validates the online downgrade if a consumer member is fenced from the
consumer group.
*
- * @param consumerGroup The ConsumerGroup.
- * @param memberId The fenced member id.
+ * @param consumerGroup The ConsumerGroup.
+ * @param fencedMemberId The fenced member id.
* @return A boolean indicating whether it's valid to online downgrade the
consumer group.
*/
- private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup,
String memberId) {
- if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+ private boolean validateOnlineDowngradeWithFencedMember(ConsumerGroup
consumerGroup, String fencedMemberId) {
+ if (!consumerGroup.allMembersUseClassicProtocolExcept(fencedMemberId))
{
return false;
} else if (consumerGroup.numMembers() <= 1) {
log.debug("Skip downgrading the consumer group {} to classic group
because it's empty.",
@@ -1066,27 +1066,59 @@ public class GroupMetadataManager {
return true;
}
+ /**
+ * Validates whether the group id is eligible for an online downgrade if
an existing
+ * static member is replaced by another new one uses the classic protocol.
+ *
+ * @param consumerGroup The group to downgrade.
+ * @param replacedMemberId The replaced member id.
+ *
+ * @return A boolean indicating whether it's valid to online downgrade the
consumer group.
+ */
+ private boolean validateOnlineDowngradeWithReplacedMemberId(
+ ConsumerGroup consumerGroup,
+ String replacedMemberId
+ ) {
+ if
(!consumerGroup.allMembersUseClassicProtocolExcept(replacedMemberId)) {
+ return false;
+ } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+ log.info("Cannot downgrade consumer group {} to classic group
because the online downgrade is disabled.",
+ consumerGroup.groupId());
+ return false;
+ } else if (consumerGroup.numMembers() > classicGroupMaxSize) {
+ log.info("Cannot downgrade consumer group {} to classic group
because its group size is greater than classic group max size.",
+ consumerGroup.groupId());
+ return false;
+ }
+ return true;
+ }
+
/**
* Creates a ClassicGroup corresponding to the given ConsumerGroup.
*
* @param consumerGroup The converted ConsumerGroup.
* @param leavingMemberId The leaving member that triggers the downgrade
validation.
- * @param response The response of the returned CoordinatorResult.
- * @return A CoordinatorResult.
+ * @param joiningMember The newly joined member if the downgrade is
triggered by static member replacement.
+ * @param records The record list to which the conversion
records are added.
*/
- private <T> CoordinatorResult<T, CoordinatorRecord> convertToClassicGroup(
+ private void convertToClassicGroup(
ConsumerGroup consumerGroup,
String leavingMemberId,
- T response
+ ConsumerGroupMember joiningMember,
+ List<CoordinatorRecord> records
) {
- List<CoordinatorRecord> records = new ArrayList<>();
- consumerGroup.createGroupTombstoneRecords(records);
+ if (joiningMember == null) {
+ consumerGroup.createGroupTombstoneRecords(records);
+ } else {
+
consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records,
leavingMemberId, joiningMember.memberId());
+ }
ClassicGroup classicGroup;
try {
classicGroup = ClassicGroup.fromConsumerGroup(
consumerGroup,
leavingMemberId,
+ joiningMember,
logContext,
time,
metadataImage
@@ -1102,14 +1134,15 @@ public class GroupMetadataManager {
// Directly update the states instead of replaying the records because
// the classicGroup reference is needed for triggering the rebalance.
- // Set the appendFuture to prevent the records from being replayed.
removeGroup(consumerGroup.groupId());
groups.put(consumerGroup.groupId(), classicGroup);
classicGroup.allMembers().forEach(member ->
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
- prepareRebalance(classicGroup, String.format("Downgrade group %s from
consumer to classic.", classicGroup.groupId()));
- return new CoordinatorResult<>(records, response, null, false);
+ // If the downgrade is triggered by a member leaving the group, a
rebalance should be triggered.
+ if (joiningMember == null) {
+ prepareRebalance(classicGroup, String.format("Downgrade group %s
from consumer to classic.", classicGroup.groupId()));
+ }
}
/**
@@ -2028,6 +2061,20 @@ public class GroupMetadataManager {
records
);
+ // 4. Maybe downgrade the consumer group if the last static member
using the
+ // consumer protocol is replaced by the joining static member.
+ String existingStaticMemberIdOrNull =
group.staticMemberId(request.groupInstanceId());
+ boolean downgrade = existingStaticMemberIdOrNull != null &&
+ validateOnlineDowngradeWithReplacedMemberId(group,
existingStaticMemberIdOrNull);
+ if (downgrade) {
+ convertToClassicGroup(
+ group,
+ existingStaticMemberIdOrNull,
+ updatedMember,
+ records
+ );
+ }
+
final JoinGroupResponseData response = new JoinGroupResponseData()
.setMemberId(updatedMember.memberId())
.setGenerationId(updatedMember.memberEpoch())
@@ -2038,15 +2085,22 @@ public class GroupMetadataManager {
appendFuture.whenComplete((__, t) -> {
if (t == null) {
cancelConsumerGroupJoinTimeout(groupId, response.memberId());
- scheduleConsumerGroupSessionTimeout(groupId,
response.memberId(), sessionTimeoutMs);
- // The sync timeout ensures that the member send sync request
within the rebalance timeout.
- scheduleConsumerGroupSyncTimeout(groupId, response.memberId(),
request.rebalanceTimeoutMs());
-
+ if (!downgrade) {
+ // If the group is still a consumer group, schedule the
session
+ // timeout for the joining member and the sync timeout to
ensure
+ // that the member send sync request within the rebalance
timeout.
+ scheduleConsumerGroupSessionTimeout(groupId,
response.memberId(), sessionTimeoutMs);
+ scheduleConsumerGroupSyncTimeout(groupId,
response.memberId(), request.rebalanceTimeoutMs());
+ }
responseFuture.complete(response);
}
});
- return new CoordinatorResult<>(records, null, appendFuture, true);
+ // If the joining member triggers a valid downgrade, the soft states
will be directly
+ // updated in the conversion method, so the records don't need to be
replayed.
+ // If the joining member doesn't trigger a valid downgrade, the group
is still a
+ // consumer group. We still rely on replaying records to update the
soft states.
+ return new CoordinatorResult<>(records, null, appendFuture,
!downgrade);
}
/**
@@ -2728,10 +2782,11 @@ public class GroupMetadataManager {
ConsumerGroupMember member,
T response
) {
- if (validateOnlineDowngrade(group, member.memberId())) {
- return convertToClassicGroup(group, member.memberId(), response);
+ List<CoordinatorRecord> records = new ArrayList<>();
+ if (validateOnlineDowngradeWithFencedMember(group, member.memberId()))
{
+ convertToClassicGroup(group, member.memberId(), null, records);
+ return new CoordinatorResult<>(records, response, null, false);
} else {
- List<CoordinatorRecord> records = new ArrayList<>();
removeMember(records, group.groupId(), member.memberId());
// We update the subscription metadata without the leaving member.
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 265e5ea4453..819eb53be38 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -39,6 +39,7 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.MetadataVersion;
@@ -1357,6 +1358,7 @@ public class ClassicGroup implements Group {
*
* @param consumerGroup The converted ConsumerGroup.
* @param leavingMemberId The member that will not be
converted in the ClassicGroup.
+ * @param joiningMember The member that needs to be
converted and added to the ClassicGroup.
* @param logContext The logContext to create the
ClassicGroup.
* @param time The time to create the
ClassicGroup.
* @param metadataImage The MetadataImage.
@@ -1365,6 +1367,7 @@ public class ClassicGroup implements Group {
public static ClassicGroup fromConsumerGroup(
ConsumerGroup consumerGroup,
String leavingMemberId,
+ ConsumerGroupMember joiningMember,
LogContext logContext,
Time time,
MetadataImage metadataImage
@@ -1399,15 +1402,38 @@ public class ClassicGroup implements Group {
}
});
+ if (joiningMember != null) {
+ classicGroup.add(
+ new ClassicGroupMember(
+ joiningMember.memberId(),
+ Optional.ofNullable(joiningMember.instanceId()),
+ joiningMember.clientId(),
+ joiningMember.clientHost(),
+ joiningMember.rebalanceTimeoutMs(),
+ joiningMember.classicProtocolSessionTimeout().get(),
+ ConsumerProtocol.PROTOCOL_TYPE,
+ joiningMember.supportedJoinGroupRequestProtocols(),
+ null
+ )
+ );
+ }
+
classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol()));
classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
classicGroup.allMembers().forEach(classicGroupMember -> {
// Set the assignment with serializing the ConsumerGroup's
targetAssignment.
// The serializing version should align with that of the member's
JoinGroupRequestProtocol.
+ String memberId = classicGroupMember.memberId();
+ if (joiningMember != null &&
memberId.equals(joiningMember.memberId())) {
+ // If the downgraded is triggered by the joining static member
replacing
+ // the leaving static member, the joining member should take
the assignment
+ // of the leaving one.
+ memberId = leavingMemberId;
+ }
byte[] assignment =
Utils.toArray(ConsumerProtocol.serializeAssignment(
toConsumerProtocolAssignment(
-
consumerGroup.targetAssignment().get(classicGroupMember.memberId()).partitions(),
+
consumerGroup.targetAssignment().get(memberId).partitions(),
metadataImage.topics()
),
ConsumerProtocol.deserializeVersion(
@@ -1452,6 +1478,13 @@ public class ClassicGroup implements Group {
}
}
+ /**
+ * For testing only.
+ */
+ public void setLeaderId(Optional<String> leaderId) {
+ this.leaderId = leaderId;
+ }
+
@Override
public String toString() {
return "ClassicGroupMetadata(" +
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 817cf7bbe24..10b17a5d641 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -528,6 +528,40 @@ public class ConsumerGroup extends
ModernGroup<ConsumerGroupMember> {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
}
+ /**
+ * Populates the list of records with tombstone(s) for deleting the group.
+ * If the removed member is the leaving member, create its tombstone with
+ * the joining member id.
+ *
+ * @param records The list of records.
+ * @param leavingMemberId The leaving member id.
+ * @param joiningMemberId The joining member id.
+ */
+ public void createGroupTombstoneRecordsWithReplacedMember(
+ List<CoordinatorRecord> records,
+ String leavingMemberId,
+ String joiningMemberId
+ ) {
+ members().forEach((memberId, __) -> {
+ String removedMemberId = memberId.equals(leavingMemberId) ?
joiningMemberId : memberId;
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(),
removedMemberId));
+ });
+
+ members().forEach((memberId, __) -> {
+ String removedMemberId = memberId.equals(leavingMemberId) ?
joiningMemberId : memberId;
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(),
removedMemberId));
+ });
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+
+ members().forEach((memberId, __) -> {
+ String removedMemberId = memberId.equals(leavingMemberId) ?
joiningMemberId : memberId;
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(),
removedMemberId));
+ });
+
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
+
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
+ }
+
@Override
public boolean isEmpty() {
return state() == ConsumerGroupState.EMPTY;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 0239d5f6304..86b9e198024 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -93,6 +93,7 @@ import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import org.opentest4j.AssertionFailedError;
import java.util.ArrayList;
import java.util.Arrays;
@@ -104,6 +105,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -10945,6 +10947,12 @@ public class GroupMetadataManagerTest {
// A new rebalance is triggered.
ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+
+ // Simulate a failed write to the log.
+ context.rollback();
+
+ // The group is reverted back to the consumer group.
+ assertEquals(consumerGroup,
context.groupMetadataManager.consumerGroup(groupId));
}
@Test
@@ -11340,6 +11348,242 @@ public class GroupMetadataManagerTest {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
+ @Test
+ public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws
ExecutionException, InterruptedException {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String oldMemberId2 = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 =
Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2),
+ new TopicPartition(barTopicName, 0),
+ new TopicPartition(barTopicName, 1)
+ )
+ ))))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols1)
+ )
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .build();
+ ConsumerGroupMember oldMember2 = new
ConsumerGroupMember.Builder(oldMemberId2)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build();
+
+ // Consumer group with two members.
+ // Member 1 uses the classic protocol and static member 2 uses the
consumer protocol.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withConsumerGroupAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member1)
+ .withMember(oldMember2)
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignment(oldMemberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10))
+ .build();
+
+
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
2));
+ }
+ }));
+ context.commit();
+
+ // A new member using classic protocol with the same instance id
joins, scheduling the downgrade.
+ JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+ GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
+ result.appendFuture.complete(null);
+ String newMemberId2 = result.joinFuture.get().memberId();
+
+ ConsumerGroupMember expectedNewConsumerMember2 = new
ConsumerGroupMember.Builder(oldMember2, newMemberId2)
+ .setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
+ .build();
+ ConsumerGroupMember expectedNewClassicMember2 = new
ConsumerGroupMember.Builder(oldMember2, newMemberId2)
+ .setPreviousMemberEpoch(0)
+ .setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
+ .setSupportedProtocols(Collections.singletonList(new
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList(fooTopicName)))))))
+ ).build();
+
+ byte[] assignment1 =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(fooTopicName, 1),
+ new TopicPartition(fooTopicName, 2),
+ new TopicPartition(barTopicName, 0),
+ new TopicPartition(barTopicName, 1)
+ ))));
+ byte[] assignment2 =
Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+ new TopicPartition(fooTopicName, 3),
+ new TopicPartition(fooTopicName, 4),
+ new TopicPartition(fooTopicName, 5)
+ ))));
+ Map<String, byte[]> assignments = new HashMap<>();
+ assignments.put(memberId1, assignment1);
+ assignments.put(newMemberId2, assignment2);
+
+ ClassicGroup expectedClassicGroup = new ClassicGroup(
+ new LogContext(),
+ groupId,
+ STABLE,
+ context.time,
+ 10,
+ Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+ Optional.of("range"),
+ Optional.of(memberId1),
+ Optional.of(context.time.milliseconds())
+ );
+ expectedClassicGroup.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.ofNullable(member1.instanceId()),
+ member1.clientId(),
+ member1.clientHost(),
+ member1.rebalanceTimeoutMs(),
+ member1.classicProtocolSessionTimeout().get(),
+ ConsumerProtocol.PROTOCOL_TYPE,
+ member1.supportedJoinGroupRequestProtocols(),
+ assignment1
+ )
+ );
+ expectedClassicGroup.add(
+ new ClassicGroupMember(
+ newMemberId2,
+ Optional.ofNullable(oldMember2.instanceId()),
+ DEFAULT_CLIENT_ID,
+ DEFAULT_CLIENT_ADDRESS.toString(),
+ joinRequest.rebalanceTimeoutMs(),
+ joinRequest.sessionTimeoutMs(),
+ joinRequest.protocolType(),
+ joinRequest.protocols(),
+ assignment2
+ )
+ );
+
+ List<CoordinatorRecord> expectedRecords = Arrays.asList(
+ // Remove the existing member 2 that uses the consumer protocol.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
oldMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
oldMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
oldMemberId2),
+
+ // Create the new member 2 that uses the consumer protocol.
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewConsumerMember2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
newMemberId2, expectedNewConsumerMember2.assignedPartitions()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewConsumerMember2),
+
+ // Update the new member 2 to the member that uses classic
protocol.
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedNewClassicMember2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedNewClassicMember2),
+
+ // Remove member 1, member 2 and the consumer group.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
+
+ // Create the classic group.
+
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting())
+ );
+
+ assertEquals(expectedRecords.size(), result.records.size());
+ assertRecordsEquals(expectedRecords.subList(0, 8),
result.records.subList(0, 8));
+ assertUnorderedListEquals(expectedRecords.subList(8, 10),
result.records.subList(8, 10));
+ assertUnorderedListEquals(expectedRecords.subList(10, 12),
result.records.subList(10, 12));
+ assertRecordEquals(expectedRecords.get(12), result.records.get(12));
+ assertUnorderedListEquals(expectedRecords.subList(13, 15),
result.records.subList(13, 15));
+ assertRecordsEquals(expectedRecords.subList(15, 17),
result.records.subList(15, 17));
+
+ // Leader can be either member 1 or member 2.
+ try {
+ assertRecordEquals(expectedRecords.get(17),
result.records.get(17));
+ } catch (AssertionFailedError e) {
+ expectedClassicGroup.setLeaderId(Optional.of(newMemberId2));
+ assertRecordEquals(
+
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup,
assignments, MetadataVersion.latestTesting()),
+ result.records.get(9)
+ );
+ }
+
+ verify(context.metrics,
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
null);
+
+ // The new classic member 1 has a heartbeat timeout.
+ ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout =
context.timer.timeout(
+ classicGroupHeartbeatKey(groupId, memberId1)
+ );
+ assertNotNull(heartbeatTimeout);
+
+ // No rebalance is triggered.
+ ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+ assertTrue(classicGroup.isInState(STABLE));
+ }
+
@Test
public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
String groupId = "group-id";
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 1d496c4b04b..ecf281f4650 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.classic;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -25,6 +27,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupResponseData;
@@ -34,10 +37,20 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.modern.Assignment;
+import org.apache.kafka.coordinator.group.modern.MemberState;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -56,17 +69,21 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE;
import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
public class ClassicGroupTest {
private final String protocolType = "consumer";
@@ -1349,6 +1366,272 @@ public class ClassicGroupTest {
}
}
+ @Test
+ public void testFromConsumerGroupWithJoiningMember() {
+ MockTime time = new MockTime();
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String newMemberId2 = Uuid.randomUuid().toString();
+ String instanceId2 = "instance-id-2";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addRacks()
+ .build();
+
+ ConsumerGroup consumerGroup = new ConsumerGroup(
+ new SnapshotRegistry(logContext),
+ groupId,
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ consumerGroup.setGroupEpoch(10);
+ consumerGroup.setTargetAssignmentEpoch(10);
+
+ consumerGroup.updateTargetAssignment(memberId1, new
Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)
+ )));
+ consumerGroup.updateTargetAssignment(memberId2, new
Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)
+ )));
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 =
Collections.singletonList(createClassicProtocol(
+ "range",
+ Collections.singletonList(fooTopicName),
+ Collections.singletonList(new TopicPartition(fooTopicName, 0))
+ ));
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols2 =
Collections.singletonList(createClassicProtocol(
+ "range",
+ Collections.singletonList(fooTopicName),
+ Collections.singletonList(new TopicPartition(fooTopicName, 1))
+ ));
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId("client-id")
+ .setClientHost("client-host")
+ .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols1))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)))
+ .build();
+ consumerGroup.updateMember(member1);
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(instanceId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId("client-id")
+ .setClientHost("client-host")
+ .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)))
+ .build();
+ consumerGroup.updateMember(member2);
+
+ ConsumerGroupMember newMember2 = new
ConsumerGroupMember.Builder(member2, newMemberId2)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(0)
+ .setClientId("client-id")
+ .setClientHost("client-host")
+ .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols2))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)))
+ .build();
+
+ ClassicGroup classicGroup = ClassicGroup.fromConsumerGroup(
+ consumerGroup,
+ memberId2,
+ newMember2,
+ logContext,
+ time,
+ metadataImage
+ );
+
+ ClassicGroup expectedClassicGroup = new ClassicGroup(
+ logContext,
+ groupId,
+ STABLE,
+ time,
+ 10,
+ Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+ Optional.of("range"),
+ Optional.empty(),
+ Optional.of(time.milliseconds())
+ );
+ expectedClassicGroup.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.empty(),
+ member1.clientId(),
+ member1.clientHost(),
+ member1.rebalanceTimeoutMs(),
+ member1.classicProtocolSessionTimeout().get(),
+ ConsumerProtocol.PROTOCOL_TYPE,
+ new
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName(protocols1.get(0).name())
+ .setMetadata(protocols1.get(0).metadata())
+ ).iterator()),
+ Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(
+ Collections.singletonList(new TopicPartition(fooTopicName,
0))
+ )))
+ )
+ );
+ expectedClassicGroup.add(
+ new ClassicGroupMember(
+ newMemberId2,
+ Optional.of(instanceId2),
+ newMember2.clientId(),
+ newMember2.clientHost(),
+ newMember2.rebalanceTimeoutMs(),
+ newMember2.classicProtocolSessionTimeout().get(),
+ ConsumerProtocol.PROTOCOL_TYPE,
+ new
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName(protocols2.get(0).name())
+ .setMetadata(protocols2.get(0).metadata())
+ ).iterator()),
+ Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(
+ Collections.singletonList(new TopicPartition(fooTopicName,
1))
+ )))
+ )
+ );
+
+ assertClassicGroupEquals(expectedClassicGroup, classicGroup);
+ }
+
+ @Test
+ public void testFromConsumerGroupWithoutJoiningMember() {
+ MockTime time = new MockTime();
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String instanceId2 = "instance-id-2";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .addRacks()
+ .build();
+
+ ConsumerGroup consumerGroup = new ConsumerGroup(
+ new SnapshotRegistry(logContext),
+ groupId,
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ consumerGroup.setGroupEpoch(10);
+ consumerGroup.setTargetAssignmentEpoch(10);
+ consumerGroup.updateTargetAssignment(memberId1, new
Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)
+ )));
+ consumerGroup.updateTargetAssignment(memberId2, new
Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)
+ )));
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 =
Collections.singletonList(createClassicProtocol(
+ "range",
+ Collections.singletonList(fooTopicName),
+ Collections.singletonList(new TopicPartition(fooTopicName, 0))
+ ));
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId("client-id")
+ .setClientHost("client-host")
+ .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols1))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)))
+ .build();
+ consumerGroup.updateMember(member1);
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId(instanceId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId("client-id")
+ .setClientHost("client-host")
+ .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+ .setServerAssignorName("range")
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 1)))
+ .build();
+ consumerGroup.updateMember(member2);
+
+ ClassicGroup classicGroup = ClassicGroup.fromConsumerGroup(
+ consumerGroup,
+ memberId2,
+ null,
+ logContext,
+ time,
+ metadataImage
+ );
+
+ ClassicGroup expectedClassicGroup = new ClassicGroup(
+ logContext,
+ groupId,
+ STABLE,
+ time,
+ 10,
+ Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+ Optional.of("range"),
+ Optional.empty(),
+ Optional.of(time.milliseconds())
+ );
+ expectedClassicGroup.add(
+ new ClassicGroupMember(
+ memberId1,
+ Optional.empty(),
+ member1.clientId(),
+ member1.clientHost(),
+ member1.rebalanceTimeoutMs(),
+ member1.classicProtocolSessionTimeout().get(),
+ ConsumerProtocol.PROTOCOL_TYPE,
+ new
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName(protocols1.get(0).name())
+ .setMetadata(protocols1.get(0).metadata())
+ ).iterator()),
+ Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(
+ Collections.singletonList(new TopicPartition(fooTopicName,
0))
+ )))
+ )
+ );
+
+ assertClassicGroupEquals(expectedClassicGroup, classicGroup);
+ }
+
private void assertState(ClassicGroup group, ClassicGroupState
targetState) {
Set<ClassicGroupState> otherStates = new HashSet<>();
otherStates.add(STABLE);
@@ -1360,4 +1643,42 @@ public class ClassicGroupTest {
otherStates.forEach(otherState ->
assertFalse(group.isInState(otherState)));
assertTrue(group.isInState(targetState));
}
+
+ private void assertClassicGroupEquals(ClassicGroup expected, ClassicGroup
actual) {
+ assertEquals(expected.groupId(), actual.groupId());
+ assertEquals(expected.protocolName(), actual.protocolName());
+ assertEquals(expected.protocolType(), actual.protocolType());
+ assertEquals(expected.leaderOrNull(), actual.leaderOrNull());
+ assertEquals(expected.stateAsString(), actual.stateAsString());
+ assertEquals(expected.generationId(), actual.generationId());
+ assertEquals(expected.allMembers().size(), actual.allMembers().size());
+ expected.allMembers().forEach(expectedMember ->
+ assertClassicGroupMemberEquals(expectedMember,
actual.member(expectedMember.memberId())));
+ }
+
+ private void assertClassicGroupMemberEquals(ClassicGroupMember expected,
ClassicGroupMember actual) {
+ assertEquals(expected.memberId(), actual.memberId());
+ assertEquals(expected.groupInstanceId(), actual.groupInstanceId());
+ assertEquals(expected.clientId(), actual.clientId());
+ assertEquals(expected.clientHost(), actual.clientHost());
+ assertEquals(expected.rebalanceTimeoutMs(),
actual.rebalanceTimeoutMs());
+ assertEquals(expected.sessionTimeoutMs(), actual.sessionTimeoutMs());
+ assertEquals(expected.protocolType(), actual.protocolType());
+ assertEquals(expected.supportedProtocols(),
actual.supportedProtocols());
+ assertArrayEquals(expected.assignment(), actual.assignment());
+ }
+
+ private ConsumerGroupMemberMetadataValue.ClassicProtocol
createClassicProtocol(
+ String protocolName,
+ List<String> subscribedTopics,
+ List<TopicPartition> assignedTopicPartitions
+ ) {
+ return new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName(protocolName)
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ subscribedTopics,
+ null,
+ assignedTopicPartitions
+ ))));
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index d6eee7bbd9b..ad407a27cbc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group.modern.consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -25,15 +26,21 @@ import
org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.classic.ClassicGroup;
+import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
@@ -56,10 +63,14 @@ import java.util.OptionalLong;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
+import static
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
+import static
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
+import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1435,4 +1446,156 @@ public class ConsumerGroupTest {
consumerGroup.updateMember(member2);
assertEquals(1, consumerGroup.numClassicProtocolMembers());
}
+
+ @Test
+ public void testCreateGroupTombstoneRecordsWithReplacedMember() {
+ String groupId = "group";
+ String memberId1 = "member-1";
+ String memberId2 = "member-2";
+ String newMemberId2 = "new-member-2";
+
+ ConsumerGroup consumerGroup = createConsumerGroup(groupId);
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new
ArrayList<>();
+ protocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+ .setMetadata(new byte[0]));
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setClassicMemberMetadata(new
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSupportedProtocols(protocols))
+ .build();
+ consumerGroup.updateMember(member1);
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setInstanceId("instance-id-2")
+ .build();
+ consumerGroup.updateMember(member2);
+
+ List<CoordinatorRecord> records = new ArrayList<>();
+ consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records,
memberId2, newMemberId2);
+
+ List<CoordinatorRecord> expectedRecords = Arrays.asList(
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
newMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
newMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
newMemberId2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
+ );
+ assertEquals(expectedRecords.size(), records.size());
+ assertUnorderedListEquals(expectedRecords.subList(0, 2),
records.subList(0, 2));
+ assertUnorderedListEquals(expectedRecords.subList(2, 4),
records.subList(2, 4));
+ assertRecordEquals(expectedRecords.get(4), records.get(4));
+ assertUnorderedListEquals(expectedRecords.subList(5, 7),
records.subList(5, 7));
+ assertRecordsEquals(expectedRecords.subList(7, 9), records.subList(7,
9));
+ }
+
+ @Test
+ public void testFromClassicGroup() {
+ MockTime time = new MockTime();
+ LogContext logContext = new LogContext();
+ String groupId = "group-id";
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 1)
+ .addTopic(barTopicId, barTopicName, 1)
+ .addRacks()
+ .build();
+
+ ClassicGroup classicGroup = new ClassicGroup(
+ logContext,
+ groupId,
+ STABLE,
+ time,
+ 10,
+ Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+ Optional.of("range"),
+ Optional.empty(),
+ Optional.of(time.milliseconds())
+ );
+
+ ClassicGroupMember member = new ClassicGroupMember(
+ memberId,
+ Optional.empty(),
+ "client-id",
+ "client-host",
+ 5000,
+ 500,
+ ConsumerProtocol.PROTOCOL_TYPE,
+ new
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+ new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(barTopicName, 0))))))
+ ).iterator()),
+ Utils.toArray(ConsumerProtocol.serializeAssignment(new
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(barTopicName, 0)
+ ))))
+ );
+ classicGroup.add(member);
+
+ ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+ new SnapshotRegistry(logContext),
+ mock(GroupCoordinatorMetricsShard.class),
+ classicGroup,
+ metadataImage.topics()
+ );
+
+ ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
+ new SnapshotRegistry(logContext),
+ groupId,
+ mock(GroupCoordinatorMetricsShard.class)
+ );
+ expectedConsumerGroup.setGroupEpoch(10);
+ expectedConsumerGroup.setTargetAssignmentEpoch(10);
+ expectedConsumerGroup.updateTargetAssignment(memberId, new
Assignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0)
+ )));
+ expectedConsumerGroup.updateMember(new
ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(classicGroup.generationId())
+ .setState(MemberState.STABLE)
+ .setPreviousMemberEpoch(classicGroup.generationId())
+ .setInstanceId(null)
+ .setRackId(null)
+ .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+ .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0),
+ mkTopicAssignment(barTopicId, 0)))
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(member.sessionTimeoutMs())
+ .setSupportedProtocols(Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Arrays.asList(fooTopicName, barTopicName),
+ null,
+ Arrays.asList(
+ new TopicPartition(fooTopicName, 0),
+ new TopicPartition(barTopicName, 0)))))))))
+ .build());
+
+ assertEquals(expectedConsumerGroup.groupId(), consumerGroup.groupId());
+ assertEquals(expectedConsumerGroup.groupEpoch(),
consumerGroup.groupEpoch());
+ assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
+ assertEquals(expectedConsumerGroup.preferredServerAssignor(),
consumerGroup.preferredServerAssignor());
+ assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
+ }
}