This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 366aa1014c0  KAFKA-17317; Validate and maybe trigger downgrade after 
static member replacement (#17306)
366aa1014c0 is described below

commit 366aa1014c01b4c14195d9930e423746f9edd999
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Mon Oct 7 05:11:16 2024 -0400

     KAFKA-17317; Validate and maybe trigger downgrade after static member 
replacement (#17306)
    
    This implementation doesn't change the existing downgrade path.
    
    In `classicGroupJoinToConsumerGroup`, if the group should be downgraded, it 
will be converted to a classic group at the end of the method. The returned 
records will be the records from GroupJoin plus the records from conversion. No 
rebalance will be triggered in the newly converted group.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  97 +++++--
 .../coordinator/group/classic/ClassicGroup.java    |  35 ++-
 .../group/modern/consumer/ConsumerGroup.java       |  34 +++
 .../group/GroupMetadataManagerTest.java            | 244 ++++++++++++++++
 .../group/classic/ClassicGroupTest.java            | 321 +++++++++++++++++++++
 .../group/modern/consumer/ConsumerGroupTest.java   | 163 +++++++++++
 6 files changed, 872 insertions(+), 22 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 39ac3c2e2f8..9d2f42d2beb 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1043,12 +1043,12 @@ public class GroupMetadataManager {
     /**
      * Validates the online downgrade if a consumer member is fenced from the 
consumer group.
      *
-     * @param consumerGroup The ConsumerGroup.
-     * @param memberId      The fenced member id.
+     * @param consumerGroup     The ConsumerGroup.
+     * @param fencedMemberId    The fenced member id.
      * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
      */
-    private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, 
String memberId) {
-        if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+    private boolean validateOnlineDowngradeWithFencedMember(ConsumerGroup 
consumerGroup, String fencedMemberId) {
+        if (!consumerGroup.allMembersUseClassicProtocolExcept(fencedMemberId)) 
{
             return false;
         } else if (consumerGroup.numMembers() <= 1) {
             log.debug("Skip downgrading the consumer group {} to classic group 
because it's empty.",
@@ -1066,27 +1066,59 @@ public class GroupMetadataManager {
         return true;
     }
 
+    /**
+     * Validates whether the group id is eligible for an online downgrade if 
an existing
+     * static member is replaced by another new one uses the classic protocol.
+     *
+     * @param consumerGroup     The group to downgrade.
+     * @param replacedMemberId  The replaced member id.
+     *
+     * @return A boolean indicating whether it's valid to online downgrade the 
consumer group.
+     */
+    private boolean validateOnlineDowngradeWithReplacedMemberId(
+        ConsumerGroup consumerGroup,
+        String replacedMemberId
+    ) {
+        if 
(!consumerGroup.allMembersUseClassicProtocolExcept(replacedMemberId)) {
+            return false;
+        } else if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+            return false;
+        }
+        return true;
+    }
+
     /**
      * Creates a ClassicGroup corresponding to the given ConsumerGroup.
      *
      * @param consumerGroup     The converted ConsumerGroup.
      * @param leavingMemberId   The leaving member that triggers the downgrade 
validation.
-     * @param response          The response of the returned CoordinatorResult.
-     * @return A CoordinatorResult.
+     * @param joiningMember     The newly joined member if the downgrade is 
triggered by static member replacement.
+     * @param records           The record list to which the conversion 
records are added.
      */
-    private <T> CoordinatorResult<T, CoordinatorRecord> convertToClassicGroup(
+    private void convertToClassicGroup(
         ConsumerGroup consumerGroup,
         String leavingMemberId,
-        T response
+        ConsumerGroupMember joiningMember,
+        List<CoordinatorRecord> records
     ) {
-        List<CoordinatorRecord> records = new ArrayList<>();
-        consumerGroup.createGroupTombstoneRecords(records);
+        if (joiningMember == null) {
+            consumerGroup.createGroupTombstoneRecords(records);
+        } else {
+            
consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records, 
leavingMemberId, joiningMember.memberId());
+        }
 
         ClassicGroup classicGroup;
         try {
             classicGroup = ClassicGroup.fromConsumerGroup(
                 consumerGroup,
                 leavingMemberId,
+                joiningMember,
                 logContext,
                 time,
                 metadataImage
@@ -1102,14 +1134,15 @@ public class GroupMetadataManager {
 
         // Directly update the states instead of replaying the records because
         // the classicGroup reference is needed for triggering the rebalance.
-        // Set the appendFuture to prevent the records from being replayed.
         removeGroup(consumerGroup.groupId());
         groups.put(consumerGroup.groupId(), classicGroup);
 
         classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
-        prepareRebalance(classicGroup, String.format("Downgrade group %s from 
consumer to classic.", classicGroup.groupId()));
 
-        return new CoordinatorResult<>(records, response, null, false);
+        // If the downgrade is triggered by a member leaving the group, a 
rebalance should be triggered.
+        if (joiningMember == null) {
+            prepareRebalance(classicGroup, String.format("Downgrade group %s 
from consumer to classic.", classicGroup.groupId()));
+        }
     }
 
     /**
@@ -2028,6 +2061,20 @@ public class GroupMetadataManager {
             records
         );
 
+        // 4. Maybe downgrade the consumer group if the last static member 
using the
+        // consumer protocol is replaced by the joining static member.
+        String existingStaticMemberIdOrNull = 
group.staticMemberId(request.groupInstanceId());
+        boolean downgrade = existingStaticMemberIdOrNull != null &&
+            validateOnlineDowngradeWithReplacedMemberId(group, 
existingStaticMemberIdOrNull);
+        if (downgrade) {
+            convertToClassicGroup(
+                group,
+                existingStaticMemberIdOrNull,
+                updatedMember,
+                records
+            );
+        }
+
         final JoinGroupResponseData response = new JoinGroupResponseData()
             .setMemberId(updatedMember.memberId())
             .setGenerationId(updatedMember.memberEpoch())
@@ -2038,15 +2085,22 @@ public class GroupMetadataManager {
         appendFuture.whenComplete((__, t) -> {
             if (t == null) {
                 cancelConsumerGroupJoinTimeout(groupId, response.memberId());
-                scheduleConsumerGroupSessionTimeout(groupId, 
response.memberId(), sessionTimeoutMs);
-                // The sync timeout ensures that the member send sync request 
within the rebalance timeout.
-                scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), 
request.rebalanceTimeoutMs());
-
+                if (!downgrade) {
+                    // If the group is still a consumer group, schedule the 
session
+                    // timeout for the joining member and the sync timeout to 
ensure
+                    // that the member send sync request within the rebalance 
timeout.
+                    scheduleConsumerGroupSessionTimeout(groupId, 
response.memberId(), sessionTimeoutMs);
+                    scheduleConsumerGroupSyncTimeout(groupId, 
response.memberId(), request.rebalanceTimeoutMs());
+                }
                 responseFuture.complete(response);
             }
         });
 
-        return new CoordinatorResult<>(records, null, appendFuture, true);
+        // If the joining member triggers a valid downgrade, the soft states 
will be directly
+        // updated in the conversion method, so the records don't need to be 
replayed.
+        // If the joining member doesn't trigger a valid downgrade, the group 
is still a
+        // consumer group. We still rely on replaying records to update the 
soft states.
+        return new CoordinatorResult<>(records, null, appendFuture, 
!downgrade);
     }
 
     /**
@@ -2728,10 +2782,11 @@ public class GroupMetadataManager {
         ConsumerGroupMember member,
         T response
     ) {
-        if (validateOnlineDowngrade(group, member.memberId())) {
-            return convertToClassicGroup(group, member.memberId(), response);
+        List<CoordinatorRecord> records = new ArrayList<>();
+        if (validateOnlineDowngradeWithFencedMember(group, member.memberId())) 
{
+            convertToClassicGroup(group, member.memberId(), null, records);
+            return new CoordinatorResult<>(records, response, null, false);
         } else {
-            List<CoordinatorRecord> records = new ArrayList<>();
             removeMember(records, group.groupId(), member.memberId());
 
             // We update the subscription metadata without the leaving member.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 265e5ea4453..819eb53be38 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -39,6 +39,7 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.MetadataVersion;
 
@@ -1357,6 +1358,7 @@ public class ClassicGroup implements Group {
      *
      * @param consumerGroup                 The converted ConsumerGroup.
      * @param leavingMemberId               The member that will not be 
converted in the ClassicGroup.
+     * @param joiningMember                 The member that needs to be 
converted and added to the ClassicGroup.
      * @param logContext                    The logContext to create the 
ClassicGroup.
      * @param time                          The time to create the 
ClassicGroup.
      * @param metadataImage                 The MetadataImage.
@@ -1365,6 +1367,7 @@ public class ClassicGroup implements Group {
     public static ClassicGroup fromConsumerGroup(
         ConsumerGroup consumerGroup,
         String leavingMemberId,
+        ConsumerGroupMember joiningMember,
         LogContext logContext,
         Time time,
         MetadataImage metadataImage
@@ -1399,15 +1402,38 @@ public class ClassicGroup implements Group {
             }
         });
 
+        if (joiningMember != null) {
+            classicGroup.add(
+                new ClassicGroupMember(
+                    joiningMember.memberId(),
+                    Optional.ofNullable(joiningMember.instanceId()),
+                    joiningMember.clientId(),
+                    joiningMember.clientHost(),
+                    joiningMember.rebalanceTimeoutMs(),
+                    joiningMember.classicProtocolSessionTimeout().get(),
+                    ConsumerProtocol.PROTOCOL_TYPE,
+                    joiningMember.supportedJoinGroupRequestProtocols(),
+                    null
+                )
+            );
+        }
+
         
classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol()));
         
classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
 
         classicGroup.allMembers().forEach(classicGroupMember -> {
             // Set the assignment with serializing the ConsumerGroup's 
targetAssignment.
             // The serializing version should align with that of the member's 
JoinGroupRequestProtocol.
+            String memberId = classicGroupMember.memberId();
+            if (joiningMember != null && 
memberId.equals(joiningMember.memberId())) {
+                // If the downgraded is triggered by the joining static member 
replacing
+                // the leaving static member, the joining member should take 
the assignment
+                // of the leaving one.
+                memberId = leavingMemberId;
+            }
             byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(
                 toConsumerProtocolAssignment(
-                    
consumerGroup.targetAssignment().get(classicGroupMember.memberId()).partitions(),
+                    
consumerGroup.targetAssignment().get(memberId).partitions(),
                     metadataImage.topics()
                 ),
                 ConsumerProtocol.deserializeVersion(
@@ -1452,6 +1478,13 @@ public class ClassicGroup implements Group {
         }
     }
 
+    /**
+     * For testing only.
+     */
+    public void setLeaderId(Optional<String> leaderId) {
+        this.leaderId = leaderId;
+    }
+
     @Override
     public String toString() {
         return "ClassicGroupMetadata(" +
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
index 817cf7bbe24..10b17a5d641 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java
@@ -528,6 +528,40 @@ public class ConsumerGroup extends 
ModernGroup<ConsumerGroupMember> {
         
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
     }
 
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     * If the removed member is the leaving member, create its tombstone with
+     * the joining member id.
+     *
+     * @param records           The list of records.
+     * @param leavingMemberId   The leaving member id.
+     * @param joiningMemberId   The joining member id.
+     */
+    public void createGroupTombstoneRecordsWithReplacedMember(
+        List<CoordinatorRecord> records,
+        String leavingMemberId,
+        String joiningMemberId
+    ) {
+        members().forEach((memberId, __) -> {
+            String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId(),
 removedMemberId));
+        });
+
+        members().forEach((memberId, __) -> {
+            String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId(),
 removedMemberId));
+        });
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId()));
+
+        members().forEach((memberId,  __) -> {
+            String removedMemberId = memberId.equals(leavingMemberId) ? 
joiningMemberId : memberId;
+            
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId(),
 removedMemberId));
+        });
+
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId()));
+        
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId()));
+    }
+
     @Override
     public boolean isEmpty() {
         return state() == ConsumerGroupState.EMPTY;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 0239d5f6304..86b9e198024 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -93,6 +93,7 @@ import org.apache.kafka.server.common.MetadataVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.opentest4j.AssertionFailedError;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -104,6 +105,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -10945,6 +10947,12 @@ public class GroupMetadataManagerTest {
         // A new rebalance is triggered.
         ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+
+        // Simulate a failed write to the log.
+        context.rollback();
+
+        // The group is reverted back to the consumer group.
+        assertEquals(consumerGroup, 
context.groupMetadataManager.consumerGroup(groupId));
     }
 
     @Test
@@ -11340,6 +11348,242 @@ public class GroupMetadataManagerTest {
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
     }
 
+    @Test
+    public void 
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMember() throws 
ExecutionException, InterruptedException {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String oldMemberId2 = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Arrays.asList(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2),
+                        new TopicPartition(barTopicName, 0),
+                        new TopicPartition(barTopicName, 1)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols1)
+            )
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember oldMember2 = new 
ConsumerGroupMember.Builder(oldMemberId2)
+            .setInstanceId(instanceId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(Collections.singletonList("foo"))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)))
+            .build();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and static member 2 uses the 
consumer protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withConsumerGroupAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 2)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(oldMember2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(oldMemberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
+            {
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
2));
+            }
+        }));
+        context.commit();
+
+        // A new member using classic protocol with the same instance id 
joins, scheduling the downgrade.
+        JoinGroupRequestData joinRequest = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        GroupMetadataManagerTestContext.JoinResult result = 
context.sendClassicGroupJoin(joinRequest);
+        result.appendFuture.complete(null);
+        String newMemberId2 = result.joinFuture.get().memberId();
+
+        ConsumerGroupMember expectedNewConsumerMember2 = new 
ConsumerGroupMember.Builder(oldMember2, newMemberId2)
+            .setMemberEpoch(0)
+            .setPreviousMemberEpoch(0)
+            .build();
+        ConsumerGroupMember expectedNewClassicMember2 = new 
ConsumerGroupMember.Builder(oldMember2, newMemberId2)
+            .setPreviousMemberEpoch(0)
+            .setRebalanceTimeoutMs(joinRequest.rebalanceTimeoutMs())
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(joinRequest.sessionTimeoutMs())
+                    .setSupportedProtocols(Collections.singletonList(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                        .setName("range")
+                        
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                            Collections.singletonList(fooTopicName)))))))
+            ).build();
+
+        byte[] assignment1 = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+            new TopicPartition(fooTopicName, 0),
+            new TopicPartition(fooTopicName, 1),
+            new TopicPartition(fooTopicName, 2),
+            new TopicPartition(barTopicName, 0),
+            new TopicPartition(barTopicName, 1)
+        ))));
+        byte[] assignment2 = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+            new TopicPartition(fooTopicName, 3),
+            new TopicPartition(fooTopicName, 4),
+            new TopicPartition(fooTopicName, 5)
+        ))));
+        Map<String, byte[]> assignments = new HashMap<>();
+        assignments.put(memberId1, assignment1);
+        assignments.put(newMemberId2, assignment2);
+
+        ClassicGroup expectedClassicGroup = new ClassicGroup(
+            new LogContext(),
+            groupId,
+            STABLE,
+            context.time,
+            10,
+            Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.of("range"),
+            Optional.of(memberId1),
+            Optional.of(context.time.milliseconds())
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.ofNullable(member1.instanceId()),
+                member1.clientId(),
+                member1.clientHost(),
+                member1.rebalanceTimeoutMs(),
+                member1.classicProtocolSessionTimeout().get(),
+                ConsumerProtocol.PROTOCOL_TYPE,
+                member1.supportedJoinGroupRequestProtocols(),
+                assignment1
+            )
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                newMemberId2,
+                Optional.ofNullable(oldMember2.instanceId()),
+                DEFAULT_CLIENT_ID,
+                DEFAULT_CLIENT_ADDRESS.toString(),
+                joinRequest.rebalanceTimeoutMs(),
+                joinRequest.sessionTimeoutMs(),
+                joinRequest.protocolType(),
+                joinRequest.protocols(),
+                assignment2
+            )
+        );
+
+        List<CoordinatorRecord> expectedRecords = Arrays.asList(
+            // Remove the existing member 2 that uses the consumer protocol.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 oldMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 oldMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 oldMemberId2),
+
+            // Create the new member 2 that uses the consumer protocol.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedNewConsumerMember2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId2, expectedNewConsumerMember2.assignedPartitions()),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedNewConsumerMember2),
+
+            // Update the new member 2 to the member that uses classic 
protocol.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedNewClassicMember2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedNewClassicMember2),
+
+            // Remove member 1, member 2 and the consumer group.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 newMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 newMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 newMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId),
+
+            // Create the classic group.
+            
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, 
assignments, MetadataVersion.latestTesting())
+        );
+
+        assertEquals(expectedRecords.size(), result.records.size());
+        assertRecordsEquals(expectedRecords.subList(0, 8), 
result.records.subList(0, 8));
+        assertUnorderedListEquals(expectedRecords.subList(8, 10), 
result.records.subList(8, 10));
+        assertUnorderedListEquals(expectedRecords.subList(10, 12), 
result.records.subList(10, 12));
+        assertRecordEquals(expectedRecords.get(12), result.records.get(12));
+        assertUnorderedListEquals(expectedRecords.subList(13, 15), 
result.records.subList(13, 15));
+        assertRecordsEquals(expectedRecords.subList(15, 17), 
result.records.subList(15, 17));
+
+        // Leader can be either member 1 or member 2.
+        try {
+            assertRecordEquals(expectedRecords.get(17), 
result.records.get(17));
+        } catch (AssertionFailedError e) {
+            expectedClassicGroup.setLeaderId(Optional.of(newMemberId2));
+            assertRecordEquals(
+                
GroupCoordinatorRecordHelpers.newGroupMetadataRecord(expectedClassicGroup, 
assignments, MetadataVersion.latestTesting()),
+                result.records.get(9)
+            );
+        }
+
+        verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE,
 null);
+
+        // The new classic member 1 has a heartbeat timeout.
+        ScheduledTimeout<Void, CoordinatorRecord> heartbeatTimeout = 
context.timer.timeout(
+            classicGroupHeartbeatKey(groupId, memberId1)
+        );
+        assertNotNull(heartbeatTimeout);
+
+        // No rebalance is triggered.
+        ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+        assertTrue(classicGroup.isInState(STABLE));
+    }
+
     @Test
     public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
         String groupId = "group-id";
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
index 1d496c4b04b..ecf281f4650 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.coordinator.group.classic;
 
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -25,6 +27,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.JoinGroupRequestData;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
 import org.apache.kafka.common.message.JoinGroupResponseData;
@@ -34,10 +37,20 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.coordinator.group.modern.Assignment;
+import org.apache.kafka.coordinator.group.modern.MemberState;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -56,17 +69,21 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPARING_REBALANCE;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class ClassicGroupTest {
     private final String protocolType = "consumer";
@@ -1349,6 +1366,272 @@ public class ClassicGroupTest {
         }
     }
 
+    @Test
+    public void testFromConsumerGroupWithJoiningMember() {
+        MockTime time = new MockTime();
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String newMemberId2 = Uuid.randomUuid().toString();
+        String instanceId2 = "instance-id-2";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 2)
+            .addRacks()
+            .build();
+
+        ConsumerGroup consumerGroup = new ConsumerGroup(
+            new SnapshotRegistry(logContext),
+            groupId,
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+        consumerGroup.setGroupEpoch(10);
+        consumerGroup.setTargetAssignmentEpoch(10);
+
+        consumerGroup.updateTargetAssignment(memberId1, new 
Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 0)
+        )));
+        consumerGroup.updateTargetAssignment(memberId2, new 
Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 1)
+        )));
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = 
Collections.singletonList(createClassicProtocol(
+            "range",
+            Collections.singletonList(fooTopicName),
+            Collections.singletonList(new TopicPartition(fooTopicName, 0))
+        ));
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols2 = 
Collections.singletonList(createClassicProtocol(
+            "range",
+            Collections.singletonList(fooTopicName),
+            Collections.singletonList(new TopicPartition(fooTopicName, 1))
+        ));
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client-id")
+            .setClientHost("client-host")
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols1))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)))
+            .build();
+        consumerGroup.updateMember(member1);
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(instanceId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client-id")
+            .setClientHost("client-host")
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1)))
+            .build();
+        consumerGroup.updateMember(member2);
+
+        ConsumerGroupMember newMember2 = new 
ConsumerGroupMember.Builder(member2, newMemberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(0)
+            .setClientId("client-id")
+            .setClientHost("client-host")
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols2))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1)))
+            .build();
+
+        ClassicGroup classicGroup = ClassicGroup.fromConsumerGroup(
+            consumerGroup,
+            memberId2,
+            newMember2,
+            logContext,
+            time,
+            metadataImage
+        );
+
+        ClassicGroup expectedClassicGroup = new ClassicGroup(
+            logContext,
+            groupId,
+            STABLE,
+            time,
+            10,
+            Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.of("range"),
+            Optional.empty(),
+            Optional.of(time.milliseconds())
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.empty(),
+                member1.clientId(),
+                member1.clientHost(),
+                member1.rebalanceTimeoutMs(),
+                member1.classicProtocolSessionTimeout().get(),
+                ConsumerProtocol.PROTOCOL_TYPE,
+                new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+                    new JoinGroupRequestData.JoinGroupRequestProtocol()
+                        .setName(protocols1.get(0).name())
+                        .setMetadata(protocols1.get(0).metadata())
+                ).iterator()),
+                Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
0))
+                )))
+            )
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                newMemberId2,
+                Optional.of(instanceId2),
+                newMember2.clientId(),
+                newMember2.clientHost(),
+                newMember2.rebalanceTimeoutMs(),
+                newMember2.classicProtocolSessionTimeout().get(),
+                ConsumerProtocol.PROTOCOL_TYPE,
+                new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+                    new JoinGroupRequestData.JoinGroupRequestProtocol()
+                        .setName(protocols2.get(0).name())
+                        .setMetadata(protocols2.get(0).metadata())
+                ).iterator()),
+                Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
1))
+                )))
+            )
+        );
+
+        assertClassicGroupEquals(expectedClassicGroup, classicGroup);
+    }
+
+    @Test
+    public void testFromConsumerGroupWithoutJoiningMember() {
+        MockTime time = new MockTime();
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String instanceId2 = "instance-id-2";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 2)
+            .addRacks()
+            .build();
+
+        ConsumerGroup consumerGroup = new ConsumerGroup(
+            new SnapshotRegistry(logContext),
+            groupId,
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+        consumerGroup.setGroupEpoch(10);
+        consumerGroup.setTargetAssignmentEpoch(10);
+        consumerGroup.updateTargetAssignment(memberId1, new 
Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 0)
+        )));
+        consumerGroup.updateTargetAssignment(memberId2, new 
Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 1)
+        )));
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = 
Collections.singletonList(createClassicProtocol(
+            "range",
+            Collections.singletonList(fooTopicName),
+            Collections.singletonList(new TopicPartition(fooTopicName, 0))
+        ));
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client-id")
+            .setClientHost("client-host")
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols1))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0)))
+            .build();
+        consumerGroup.updateMember(member1);
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(instanceId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client-id")
+            .setClientHost("client-host")
+            .setSubscribedTopicNames(Collections.singletonList(fooTopicName))
+            .setServerAssignorName("range")
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 1)))
+            .build();
+        consumerGroup.updateMember(member2);
+
+        ClassicGroup classicGroup = ClassicGroup.fromConsumerGroup(
+            consumerGroup,
+            memberId2,
+            null,
+            logContext,
+            time,
+            metadataImage
+        );
+
+        ClassicGroup expectedClassicGroup = new ClassicGroup(
+            logContext,
+            groupId,
+            STABLE,
+            time,
+            10,
+            Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.of("range"),
+            Optional.empty(),
+            Optional.of(time.milliseconds())
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.empty(),
+                member1.clientId(),
+                member1.clientHost(),
+                member1.rebalanceTimeoutMs(),
+                member1.classicProtocolSessionTimeout().get(),
+                ConsumerProtocol.PROTOCOL_TYPE,
+                new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+                    new JoinGroupRequestData.JoinGroupRequestProtocol()
+                        .setName(protocols1.get(0).name())
+                        .setMetadata(protocols1.get(0).metadata())
+                ).iterator()),
+                Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(
+                    Collections.singletonList(new TopicPartition(fooTopicName, 
0))
+                )))
+            )
+        );
+
+        assertClassicGroupEquals(expectedClassicGroup, classicGroup);
+    }
+
     private void assertState(ClassicGroup group, ClassicGroupState 
targetState) {
         Set<ClassicGroupState> otherStates = new HashSet<>();
         otherStates.add(STABLE);
@@ -1360,4 +1643,42 @@ public class ClassicGroupTest {
         otherStates.forEach(otherState -> 
assertFalse(group.isInState(otherState)));
         assertTrue(group.isInState(targetState));
     }
+
+    private void assertClassicGroupEquals(ClassicGroup expected, ClassicGroup 
actual) {
+        assertEquals(expected.groupId(), actual.groupId());
+        assertEquals(expected.protocolName(), actual.protocolName());
+        assertEquals(expected.protocolType(), actual.protocolType());
+        assertEquals(expected.leaderOrNull(), actual.leaderOrNull());
+        assertEquals(expected.stateAsString(), actual.stateAsString());
+        assertEquals(expected.generationId(), actual.generationId());
+        assertEquals(expected.allMembers().size(), actual.allMembers().size());
+        expected.allMembers().forEach(expectedMember ->
+            assertClassicGroupMemberEquals(expectedMember, 
actual.member(expectedMember.memberId())));
+    }
+
+    private void assertClassicGroupMemberEquals(ClassicGroupMember expected, 
ClassicGroupMember actual) {
+        assertEquals(expected.memberId(), actual.memberId());
+        assertEquals(expected.groupInstanceId(), actual.groupInstanceId());
+        assertEquals(expected.clientId(), actual.clientId());
+        assertEquals(expected.clientHost(), actual.clientHost());
+        assertEquals(expected.rebalanceTimeoutMs(), 
actual.rebalanceTimeoutMs());
+        assertEquals(expected.sessionTimeoutMs(), actual.sessionTimeoutMs());
+        assertEquals(expected.protocolType(), actual.protocolType());
+        assertEquals(expected.supportedProtocols(), 
actual.supportedProtocols());
+        assertArrayEquals(expected.assignment(), actual.assignment());
+    }
+
+    private ConsumerGroupMemberMetadataValue.ClassicProtocol 
createClassicProtocol(
+        String protocolName,
+        List<String> subscribedTopics,
+        List<TopicPartition> assignedTopicPartitions
+    ) {
+        return new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+            .setName(protocolName)
+            
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                subscribedTopics,
+                null,
+                assignedTopicPartitions
+            ))));
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index d6eee7bbd9b..ad407a27cbc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.coordinator.group.modern.consumer;
 
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
@@ -25,15 +26,21 @@ import 
org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
 import org.apache.kafka.coordinator.group.MetadataImageBuilder;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.classic.ClassicGroup;
+import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
 import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
@@ -56,10 +63,14 @@ import java.util.OptionalLong;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
+import static 
org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals;
+import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
+import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -1435,4 +1446,156 @@ public class ConsumerGroupTest {
         consumerGroup.updateMember(member2);
         assertEquals(1, consumerGroup.numClassicProtocolMembers());
     }
+
+    @Test
+    public void testCreateGroupTombstoneRecordsWithReplacedMember() {
+        String groupId = "group";
+        String memberId1 = "member-1";
+        String memberId2 = "member-2";
+        String newMemberId2 = "new-member-2";
+
+        ConsumerGroup consumerGroup = createConsumerGroup(groupId);
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = new 
ArrayList<>();
+        protocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+            .setName("range")
+            .setMetadata(new byte[0]));
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                .setSupportedProtocols(protocols))
+            .build();
+        consumerGroup.updateMember(member1);
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId("instance-id-2")
+            .build();
+        consumerGroup.updateMember(member2);
+
+        List<CoordinatorRecord> records = new ArrayList<>();
+        consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records, 
memberId2, newMemberId2);
+
+        List<CoordinatorRecord> expectedRecords = Arrays.asList(
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 newMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 newMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord(groupId),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId1),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 newMemberId2),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId),
+            
GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord(groupId)
+        );
+        assertEquals(expectedRecords.size(), records.size());
+        assertUnorderedListEquals(expectedRecords.subList(0, 2), 
records.subList(0, 2));
+        assertUnorderedListEquals(expectedRecords.subList(2, 4), 
records.subList(2, 4));
+        assertRecordEquals(expectedRecords.get(4), records.get(4));
+        assertUnorderedListEquals(expectedRecords.subList(5, 7), 
records.subList(5, 7));
+        assertRecordsEquals(expectedRecords.subList(7, 9), records.subList(7, 
9));
+    }
+
+    @Test
+    public void testFromClassicGroup() {
+        MockTime time = new MockTime();
+        LogContext logContext = new LogContext();
+        String groupId = "group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 1)
+            .addTopic(barTopicId, barTopicName, 1)
+            .addRacks()
+            .build();
+
+        ClassicGroup classicGroup = new ClassicGroup(
+            logContext,
+            groupId,
+            STABLE,
+            time,
+            10,
+            Optional.of(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.of("range"),
+            Optional.empty(),
+            Optional.of(time.milliseconds())
+        );
+
+        ClassicGroupMember member = new ClassicGroupMember(
+            memberId,
+            Optional.empty(),
+            "client-id",
+            "client-host",
+            5000,
+            500,
+            ConsumerProtocol.PROTOCOL_TYPE,
+            new 
JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(
+                new JoinGroupRequestData.JoinGroupRequestProtocol()
+                    .setName("range")
+                    
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                            Arrays.asList(fooTopicName, barTopicName),
+                            null,
+                            Arrays.asList(
+                                new TopicPartition(fooTopicName, 0),
+                                new TopicPartition(barTopicName, 0))))))
+            ).iterator()),
+            Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+                new TopicPartition(fooTopicName, 0),
+                new TopicPartition(barTopicName, 0)
+            ))))
+        );
+        classicGroup.add(member);
+
+        ConsumerGroup consumerGroup = ConsumerGroup.fromClassicGroup(
+            new SnapshotRegistry(logContext),
+            mock(GroupCoordinatorMetricsShard.class),
+            classicGroup,
+            metadataImage.topics()
+        );
+
+        ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
+            new SnapshotRegistry(logContext),
+            groupId,
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+        expectedConsumerGroup.setGroupEpoch(10);
+        expectedConsumerGroup.setTargetAssignmentEpoch(10);
+        expectedConsumerGroup.updateTargetAssignment(memberId, new 
Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 0)
+        )));
+        expectedConsumerGroup.updateMember(new 
ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(classicGroup.generationId())
+            .setState(MemberState.STABLE)
+            .setPreviousMemberEpoch(classicGroup.generationId())
+            .setInstanceId(null)
+            .setRackId(null)
+            .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+            .setClientId(member.clientId())
+            .setClientHost(member.clientHost())
+            .setSubscribedTopicNames(Arrays.asList(fooTopicName, barTopicName))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0),
+                mkTopicAssignment(barTopicId, 0)))
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(member.sessionTimeoutMs())
+                    .setSupportedProtocols(Collections.singletonList(
+                        new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                            .setName("range")
+                            
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                                Arrays.asList(fooTopicName, barTopicName),
+                                null,
+                                Arrays.asList(
+                                    new TopicPartition(fooTopicName, 0),
+                                    new TopicPartition(barTopicName, 0)))))))))
+            .build());
+
+        assertEquals(expectedConsumerGroup.groupId(), consumerGroup.groupId());
+        assertEquals(expectedConsumerGroup.groupEpoch(), 
consumerGroup.groupEpoch());
+        assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
+        assertEquals(expectedConsumerGroup.preferredServerAssignor(), 
consumerGroup.preferredServerAssignor());
+        assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
+    }
 }

Reply via email to