This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb1c4465c92 KAFKA-14516: [1/N] Static Member leave, join, re-join 
request using ConsumerGroupHeartbeats (#14432)
bb1c4465c92 is described below

commit bb1c4465c92d890fdb6fa2ebb2c2d2f1b8648feb
Author: vamossagar12 <[email protected]>
AuthorDate: Tue Nov 28 23:38:16 2023 +0530

    KAFKA-14516: [1/N] Static Member leave, join, re-join request using 
ConsumerGroupHeartbeats (#14432)
    
    This patch add the support for static membership to the new consumer group 
protocol. With a static member can join, re-join, temporarily leave and leave. 
When a member leaves with the expectation to rejoin, it must rejoin within the 
session timeout. It is kicks out from the consumer group otherwise.
    
    Reviewers: David Jacot <[email protected]>
---
 .../requests/ConsumerGroupHeartbeatRequest.java    |    1 +
 .../coordinator/group/GroupMetadataManager.java    |  255 +++-
 .../coordinator/group/consumer/ConsumerGroup.java  |   61 +
 .../group/consumer/TargetAssignmentBuilder.java    |   28 +-
 .../group/GroupMetadataManagerTest.java            | 1252 +++++++++++++++++---
 .../group/consumer/ConsumerGroupTest.java          |   50 +
 .../consumer/TargetAssignmentBuilderTest.java      |  120 +-
 7 files changed, 1568 insertions(+), 199 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index ac3a079641d..ced4880adf6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -30,6 +30,7 @@ public class ConsumerGroupHeartbeatRequest extends 
AbstractRequest {
      * A member epoch of <code>-1</code> means that the member wants to leave 
the group.
      */
     public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
+    public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2;
 
     /**
      * A member epoch of <code>0</code> means that the member wants to join 
the group.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9116d9b6017..eafb6bb2170 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -28,6 +28,8 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
@@ -104,6 +106,7 @@ import static 
org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
 import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
 import static org.apache.kafka.coordinator.group.Group.GroupType.GENERIC;
@@ -663,6 +666,22 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Throws an InvalidRequestException if the value is null.
+     *
+     * @param value The value.
+     * @param error The error message.
+     * @throws InvalidRequestException
+     */
+    private void throwIfNull(
+        Object value,
+        String error
+    ) throws InvalidRequestException {
+        if (value == null) {
+            throw new InvalidRequestException(error);
+        }
+    }
+
     /**
      * Validates the request.
      *
@@ -691,6 +710,9 @@ public class GroupMetadataManager {
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+        } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+            throwIfNull(request.instanceId(), "InstanceId can't be null.");
         } else {
             throw new InvalidRequestException("MemberEpoch is invalid.");
         }
@@ -781,6 +803,59 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Validates if the received instanceId has been released from the group
+     *
+     * @param member                The consumer group member.
+     * @param groupId               The consumer group id.
+     * @param receivedMemberId      The member id received in the request.
+     * @param receivedInstanceId    The instance id received in the request.
+     *
+     * @throws UnreleasedInstanceIdException if the instance id received in 
the request is still in use by an existing static member.
+     */
+    private void throwIfInstanceIdIsUnreleased(ConsumerGroupMember member, 
String groupId, String receivedMemberId, String receivedInstanceId) {
+        if (member.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // The new member can't join.
+            log.info("[GroupId {}] Static member {} with instance id {} cannot 
join the group because the instance id is" +
+                    " is owned by member {}.", groupId, receivedMemberId, 
receivedInstanceId, member.memberId());
+            throw Errors.UNRELEASED_INSTANCE_ID.exception("Static member " + 
receivedMemberId + " with instance id "
+                + receivedInstanceId + " cannot join the group because the 
instance id is owned by " + member.memberId() + " member.");
+        }
+    }
+
+    /**
+     * Validates if the received instanceId has been released from the group
+     *
+     * @param member                The consumer group member.
+     * @param groupId               The consumer group id.
+     * @param receivedMemberId      The member id received in the request.
+     * @param receivedInstanceId    The instance id received in the request.
+     *
+     * @throws FencedInstanceIdException if the instance id provided is fenced 
because of another static member.
+     */
+    private void throwIfInstanceIdIsFenced(ConsumerGroupMember member, String 
groupId, String receivedMemberId, String receivedInstanceId) {
+        if (!member.memberId().equals(receivedMemberId)) {
+            log.info("[GroupId {}] Static member {} with instance id {} is 
fenced by existing member {}.",
+                groupId, receivedMemberId, receivedInstanceId, 
member.memberId());
+            throw Errors.FENCED_INSTANCE_ID.exception("Static member " + 
receivedMemberId + " with instance id "
+                + receivedInstanceId + " was fenced by member " + 
member.memberId() + ".");
+        }
+    }
+
+    /**
+     * Validates if the received instanceId has been released from the group
+     *
+     * @param staticMember          The static member in the group.
+     * @param receivedInstanceId    The instance id received in the request.
+     *
+     * @throws UnknownMemberIdException if no static member exists in the 
group against the provided instance id.
+     */
+    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember 
staticMember, String receivedInstanceId) {
+        if (staticMember == null) {
+            throw Errors.UNKNOWN_MEMBER_ID.exception("Instance id " + 
receivedInstanceId + " is unknown.");
+        }
+    }
+
     private ConsumerGroupHeartbeatResponseData.Assignment 
createResponseAssignment(
         ConsumerGroupMember member
     ) {
@@ -849,21 +924,54 @@ public class GroupMetadataManager {
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
-        throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
-
-        if (memberEpoch == 0) {
-            log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+        ConsumerGroupMember member;
+        ConsumerGroupMember.Builder updatedMemberBuilder;
+        boolean staticMemberReplaced = false;
+        if (instanceId == null) {
+            member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
+            throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+            if (createIfNotExists) {
+                log.info("[GroupId {}] Member {} joins the consumer group.", 
groupId, memberId);
+            }
+            updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+        } else {
+            member = group.staticMember(instanceId);
+            if (memberEpoch == 0) {
+                // A new static member joins or the existing static member 
rejoins.
+                if (member == null) {
+                    // New static member.
+                    member = group.getOrMaybeCreateMember(memberId, 
createIfNotExists);
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(member);
+                    log.info("[GroupId {}] Static member {} with instance id 
{} joins the consumer group.", groupId, memberId, instanceId);
+                } else {
+                    // Static member rejoins with a different member id so it 
should replace
+                    // the previous instance iff the previous member had sent 
a leave group.
+                    throwIfInstanceIdIsUnreleased(member, groupId, memberId, 
instanceId);
+                    // Replace the current member.
+                    staticMemberReplaced = true;
+                    updatedMemberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                        .setAssignedPartitions(member.assignedPartitions());
+                    removeMemberAndCancelTimers(records, group.groupId(), 
member.memberId());
+                    log.info("[GroupId {}] Static member {} with instance id 
{} re-joins the consumer group.", groupId, memberId, instanceId);
+                }
+            } else {
+                throwIfStaticMemberIsUnknown(member, instanceId);
+                throwIfInstanceIdIsFenced(member, groupId, memberId, 
instanceId);
+                throwIfMemberEpochIsInvalid(member, memberEpoch, 
ownedTopicPartitions);
+                updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
+            }
         }
 
+
+        int groupEpoch = group.groupEpoch();
+        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
+
         // 1. Create or update the member. If the member is new or has 
changed, a ConsumerGroupMemberMetadataValue
         // record is written to the __consumer_offsets partition to persist 
the change. If the subscriptions have
         // changed, the subscription metadata is updated and persisted by 
writing a ConsumerGroupPartitionMetadataValue
         // record to the __consumer_offsets partition. Finally, the group 
epoch is bumped if the subscriptions have
         // changed, and persisted by writing a ConsumerGroupMetadataValue 
record to the partition.
-        int groupEpoch = group.groupEpoch();
-        Map<String, TopicMetadata> subscriptionMetadata = 
group.subscriptionMetadata();
-        ConsumerGroupMember updatedMember = new 
ConsumerGroupMember.Builder(member)
+        ConsumerGroupMember updatedMember = updatedMemberBuilder
             .maybeUpdateInstanceId(Optional.ofNullable(instanceId))
             .maybeUpdateRackId(Optional.ofNullable(rackId))
             .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
@@ -919,25 +1027,35 @@ public class GroupMetadataManager {
             group.setMetadataRefreshDeadline(currentTimeMs + 
consumerGroupMetadataRefreshIntervalMs, groupEpoch);
         }
 
-        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The
-        // delta between the existing and the new target assignment is 
persisted to the partition.
+        // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
+        // replaces an existing static member. The delta between the existing 
and the new target assignment is persisted to the partition.
         int targetAssignmentEpoch = group.assignmentEpoch();
         Assignment targetAssignment = group.targetAssignment(memberId);
-        if (groupEpoch > targetAssignmentEpoch) {
+        if (groupEpoch > targetAssignmentEpoch || staticMemberReplaced) {
             String preferredServerAssignor = 
group.computePreferredServerAssignor(
                 member,
                 updatedMember
             ).orElse(defaultAssignor.name());
-
             try {
-                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult =
+                TargetAssignmentBuilder assignmentResultBuilder =
                     new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
                         .withMembers(group.members())
+                        .withStaticMembers(group.staticMembers())
                         .withSubscriptionMetadata(subscriptionMetadata)
                         .withTargetAssignment(group.targetAssignment())
-                        .addOrUpdateMember(memberId, updatedMember)
+                        .addOrUpdateMember(memberId, updatedMember);
+                TargetAssignmentBuilder.TargetAssignmentResult 
assignmentResult;
+                // A new static member is replacing an older one with the same 
subscriptions.
+                // We just need to remove the older member and add the newer 
one. The new member should
+                // reuse the target assignment of the older member.
+                if (staticMemberReplaced) {
+                    assignmentResult = assignmentResultBuilder
+                        .removeMember(member.memberId())
                         .build();
-
+                } else {
+                    assignmentResult = assignmentResultBuilder
+                        .build();
+                }
                 log.info("[GroupId {}] Computed a new target assignment for 
epoch {}: {}.",
                     groupId, groupEpoch, assignmentResult.targetAssignment());
 
@@ -1004,27 +1122,79 @@ public class GroupMetadataManager {
         return new CoordinatorResult<>(records, response);
     }
 
+    private void removeMemberAndCancelTimers(
+        List<Record> records,
+        String groupId,
+        String memberId
+    ) {
+        // Write tombstones for the departed static member.
+        removeMember(records, groupId, memberId);
+        // Cancel all the timers of the departed static member.
+        cancelTimers(groupId, memberId);
+    }
+
     /**
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records;
+        if (instanceId == null) {
+            ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
+            log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
+            records = consumerGroupFenceMember(group, member);
+        } else {
+            ConsumerGroupMember member = group.staticMember(instanceId);
+            throwIfStaticMemberIsUnknown(member, instanceId);
+            throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
+            if (memberEpoch == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
temporarily left the consumer group.",
+                    group.groupId(), memberId, instanceId);
+                records = consumerGroupStaticMemberGroupLeave(group, member);
+            } else {
+                log.info("[GroupId {}] Static Member {} with instance id {} 
left the consumer group.",
+                    group.groupId(), memberId, instanceId);
+                records = consumerGroupFenceMember(group, member);
+            }
+        }
         return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
             .setMemberId(memberId)
-            .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+            .setMemberEpoch(memberEpoch));
+    }
+
+    /**
+     * Handles the case when a static member decides to leave the group.
+     * The member is not actually fenced from the group, and instead it's
+     * member epoch is updated to -2 to reflect that a member using the given
+     * instance id decided to leave the group and would be back within session
+     * timeout.
+     *
+     * @param group      The group.
+     * @param member     The static member in the group for the instance id.
+     *
+     * @return A list with a single record signifying that the static member 
is leaving.
+     */
+    private List<Record> consumerGroupStaticMemberGroupLeave(
+        ConsumerGroup group,
+        ConsumerGroupMember member
+    ) {
+        // We will write a member epoch of -2 for this departing static member.
+        ConsumerGroupMember leavingStaticMember = new 
ConsumerGroupMember.Builder(member)
+            .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+            .setPartitionsPendingRevocation(Collections.emptyMap())
+            .build();
+        return 
Collections.singletonList(newCurrentAssignmentRecord(group.groupId(), 
leavingStaticMember));
     }
 
     /**
@@ -1041,10 +1211,7 @@ public class GroupMetadataManager {
     ) {
         List<Record> records = new ArrayList<>();
 
-        // Write tombstones for the member. The order matters here.
-        records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
member.memberId()));
-        records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
member.memberId()));
-        records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
member.memberId()));
+        removeMember(records, group.groupId(), member.memberId());
 
         // We update the subscription metadata without the leaving member.
         Map<String, TopicMetadata> subscriptionMetadata = 
group.computeSubscriptionMetadata(
@@ -1064,13 +1231,35 @@ public class GroupMetadataManager {
         int groupEpoch = group.groupEpoch() + 1;
         records.add(newGroupEpochRecord(group.groupId(), groupEpoch));
 
-        // Cancel all the timers of the member.
-        cancelConsumerGroupSessionTimeout(group.groupId(), member.memberId());
-        cancelConsumerGroupRevocationTimeout(group.groupId(), 
member.memberId());
+        cancelTimers(group.groupId(), member.memberId());
 
         return records;
     }
 
+    /**
+     * Write tombstones for the member. The order matters here.
+     *
+     * @param records       The list of records to append the member 
assignment tombstone records.
+     * @param groupId       The group id.
+     * @param memberId      The member id.
+     */
+    private void removeMember(List<Record> records, String groupId, String 
memberId) {
+        records.add(newCurrentAssignmentTombstoneRecord(groupId, memberId));
+        records.add(newTargetAssignmentTombstoneRecord(groupId, memberId));
+        records.add(newMemberSubscriptionTombstoneRecord(groupId, memberId));
+    }
+
+    /**
+     * Cancel all the timers of the member.
+     *
+     * @param groupId       The group id.
+     * @param memberId      The member id.
+     */
+    private void cancelTimers(String groupId, String memberId) {
+        cancelConsumerGroupSessionTimeout(groupId, memberId);
+        cancelConsumerGroupRevocationTimeout(groupId, memberId);
+    }
+
     /**
      * Schedules (or reschedules) the session timeout for the member.
      *
@@ -1184,10 +1373,14 @@ public class GroupMetadataManager {
     ) throws ApiException {
         throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
 
-        if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
+        if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || 
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+            // -1 means that the member wants to leave the group.
+            // -2 means that a static member wants to leave the group.
             return consumerGroupLeave(
                 request.groupId(),
-                request.memberId()
+                request.instanceId(),
+                request.memberId(),
+                request.memberEpoch()
             );
         } else {
             // Otherwise, it is a regular heartbeat.
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index f3c22837472..bad716d82ff 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -115,6 +115,11 @@ public class ConsumerGroup implements Group {
      */
     private final TimelineHashMap<String, ConsumerGroupMember> members;
 
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
     /**
      * The number of members supporting each server assignor name.
      */
@@ -176,6 +181,7 @@ public class ConsumerGroup implements Group {
         this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
         this.groupEpoch = new TimelineInteger(snapshotRegistry);
         this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
         this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
         this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
         this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 
0);
@@ -276,6 +282,18 @@ public class ConsumerGroup implements Group {
         maybeUpdateGroupState();
     }
 
+    /**
+     * Get member id of a static member that matches the given group
+     * instance id.
+     *
+     * @param groupInstanceId The group instance id.
+     *
+     * @return The member id corresponding to the given instance id or null if 
it does not exist
+     */
+    public String staticMemberId(String groupInstanceId) {
+        return staticMembers.get(groupInstanceId);
+    }
+
     /**
      * Gets or creates a member.
      *
@@ -302,6 +320,18 @@ public class ConsumerGroup implements Group {
         return member;
     }
 
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId The group instance id.
+     *
+     * @return The member corresponding to the given instance id or null if it 
does not exist
+     */
+    public ConsumerGroupMember staticMember(String instanceId) {
+        String existingMemberId = staticMemberId(instanceId);
+        return existingMemberId == null ? null : 
getOrMaybeCreateMember(existingMemberId, false);
+    }
+
     /**
      * Updates the member.
      *
@@ -315,9 +345,21 @@ public class ConsumerGroup implements Group {
         maybeUpdateSubscribedTopicNames(oldMember, newMember);
         maybeUpdateServerAssignors(oldMember, newMember);
         maybeUpdatePartitionEpoch(oldMember, newMember);
+        updateStaticMember(newMember);
         maybeUpdateGroupState();
     }
 
+    /**
+     * Updates the member id stored against the instance id if the member is a 
static member.
+     *
+     * @param newMember The new member state.
+     */
+    private void updateStaticMember(ConsumerGroupMember newMember) {
+        if (newMember.instanceId() != null) {
+            staticMembers.put(newMember.instanceId(), newMember.memberId());
+        }
+    }
+
     /**
      * Remove the member from the group.
      *
@@ -328,9 +370,21 @@ public class ConsumerGroup implements Group {
         maybeUpdateSubscribedTopicNames(oldMember, null);
         maybeUpdateServerAssignors(oldMember, null);
         maybeRemovePartitionEpoch(oldMember);
+        removeStaticMember(oldMember);
         maybeUpdateGroupState();
     }
 
+    /**
+     * Remove the static member mapping if the removed member is static.
+     *
+     * @param oldMember The member to remove.
+     */
+    private void removeStaticMember(ConsumerGroupMember oldMember) {
+        if (oldMember.instanceId() != null) {
+            staticMembers.remove(oldMember.instanceId());
+        }
+    }
+
     /**
      * Returns true if the member exists.
      *
@@ -356,6 +410,13 @@ public class ConsumerGroup implements Group {
         return Collections.unmodifiableMap(members);
     }
 
+    /**
+     * @return An immutable Map containing all the static members keyed by 
instance id.
+     */
+    public Map<String, String> staticMembers() {
+        return Collections.unmodifiableMap(staticMembers);
+    }
+
     /**
      * @return An immutable Set containing all the subscribed topic names.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
index 2026efaa82c..b7d95fe2102 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
@@ -127,6 +127,11 @@ public class TargetAssignmentBuilder {
      */
     private final Map<String, ConsumerGroupMember> updatedMembers = new 
HashMap<>();
 
+    /**
+     * The static members in the group.
+     */
+    private Map<String, String> staticMembers = new HashMap<>();
+
     /**
      * Constructs the object.
      *
@@ -157,6 +162,19 @@ public class TargetAssignmentBuilder {
         return this;
     }
 
+    /**
+     * Adds all the existing static members.
+     *
+     * @param staticMembers   The existing static members in the consumer 
group.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withStaticMembers(
+        Map<String, String> staticMembers
+    ) {
+        this.staticMembers = staticMembers;
+        return this;
+    }
+
     /**
      * Adds the subscription metadata to use.
      *
@@ -234,9 +252,17 @@ public class TargetAssignmentBuilder {
             if (updatedMemberOrNull == null) {
                 memberSpecs.remove(memberId);
             } else {
+                ConsumerGroupMember member = members.get(memberId);
+                Assignment assignment;
+                // A new static member joins and needs to replace an existing 
departed one.
+                if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
+                    assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);
+                } else {
+                    assignment = targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY);
+                }
                 memberSpecs.put(memberId, createAssignmentMemberSpec(
                     updatedMemberOrNull,
-                    targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+                    assignment,
                     subscriptionMetadata
                 ));
             }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2dca0c2e352..6711619eb09 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -31,6 +31,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
@@ -123,6 +124,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -1313,6 +1315,17 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(1)
                 .setServerAssignor("bar")));
         assertEquals("ServerAssignor bar is not supported. Supported 
assignors: range.", ex.getMessage());
+
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(Uuid.randomUuid().toString())
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+
+        assertEquals("InstanceId can't be null.", ex.getMessage());
     }
 
     @Test
@@ -1686,172 +1699,1045 @@ public class GroupMetadataManagerTest {
     }
 
     @Test
-    public void testNewJoiningMemberTriggersNewTargetAssignment() {
+    public void testNewJoiningMemberTriggersNewTargetAssignment() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setServerAssignor("range")
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testLeavingMemberBumpsGroupEpoch() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Uuid zarTopicId = Uuid.randomUuid();
+        String zarTopicName = "zar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addTopic(zarTopicId, zarTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
+            result.response()
+        );
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            // Subscription metadata is recomputed because zar is no longer 
there.
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setInstanceId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setInstanceId(memberId3)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testStaticMemberGetsBackAssignmentUponRejoin() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    }
+                }))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)
+                    )));
+                    // When the member rejoins, it gets the same assignments.
+                    put(member2RejoinId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // Member epoch of the response would be set to -2.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-2),
+            result.response()
+        );
+
+        // The departing static member will have it's epoch set to -2.
+        ConsumerGroupMember member2UpdatedEpoch = new 
ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(-2)
+            .build();
+
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+
+        // Member 2 rejoins the group with the same instance id.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
rejoinResult = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member2RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member2RejoinId)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(3, 4, 5)),
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(barTopicId)
+                            .setPartitions(Collections.singletonList(2))
+                    ))),
+            rejoinResult.response()
+        );
+
+        ConsumerGroupMember expectedRejoinedMember = new 
ConsumerGroupMember.Builder(member2RejoinId)
+            .setMemberEpoch(10)
+            .setInstanceId(memberId2)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecordsAfterRejoin = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedRejoinedMember),
+            RecordHelpers.newTargetAssignmentRecord(groupId, member2RejoinId, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2))),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 10),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
expectedRejoinedMember)
+        );
+
+        assertRecordsEquals(expectedRecordsAfterRejoin, 
rejoinResult.records());
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId2);
+        context.assertNoRevocationTimeout(groupId, memberId2);
+    }
+
+    @Test
+    public void testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            // Use zar only here to ensure that metadata needs to be 
recomputed.
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // member epoch of the response would be set to -2
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH),
+            result.response()
+        );
+
+        ConsumerGroupMember member2UpdatedEpoch = new ConsumerGroupMember
+            .Builder(member2)
+            .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+            .build();
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+    }
+
+    @Test
+    public void testLeavingStaticMemberBumpsGroupEpoch() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Uuid zarTopicId = Uuid.randomUuid();
+        String zarTopicName = "zar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addTopic(zarTopicId, zarTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
+            result.response()
+        );
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            // Subscription metadata is recomputed because zar is no longer 
there.
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void 
testShouldThrownUnreleasedInstanceIdExceptionWhenNewMemberJoinsWithInUseInstanceId()
 {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 joins the consumer group with an in-use instance id.
+        assertThrows(UnreleasedInstanceIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId1)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void 
testShouldThrownUnknownMemberIdExceptionWhenUnknownStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Member 2 joins the consumer group with a non-zero epoch
+        assertThrows(UnknownMemberIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(10)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdJoins()
 {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assertThrows(FencedInstanceIdException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId("unknown-" + memberId1)
+                .setInstanceId(memberId1)
+                .setMemberEpoch(11)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testConsumerGroupMemberEpochValidationForStaticMember() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setInstanceId(memberId)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setTargetMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
1, 2, 3)))
+            .build();
+
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId, 
member));
+
+        context.replay(RecordHelpers.newGroupEpochRecord(groupId, 100));
+
+        context.replay(RecordHelpers.newTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2, 3)
+        )));
+
+        context.replay(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
100));
+
+        context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, 
member));
+
+        // Member epoch is greater than the expected epoch.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(200)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member epoch is smaller than the expected epoch.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(50)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member joins with previous epoch but without providing partitions.
+        assertThrows(FencedMemberEpochException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(99)
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+
+        // Member joins with previous epoch and has a subset of the owned 
partitions. This
+        // is accepted as the response with the bumped epoch may have been 
lost. In this
+        // case, we provide back the correct epoch to the member.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setInstanceId(memberId)
+                .setMemberEpoch(99)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.singletonList(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(Arrays.asList(1, 2)))));
+        assertEquals(100, result.response().memberEpoch());
+    }
+
+    @Test
+    public void 
testShouldThrowUnknownMemberIdExceptionWhenUnknownStaticMemberLeaves() {
         String groupId = "fooup";
         // Use a static member id as it makes the test easier.
         String memberId1 = Uuid.randomUuid().toString();
-        String memberId2 = Uuid.randomUuid().toString();
-        String memberId3 = Uuid.randomUuid().toString();
 
         Uuid fooTopicId = Uuid.randomUuid();
         String fooTopicName = "foo";
-        Uuid barTopicId = Uuid.randomUuid();
-        String barTopicName = "bar";
 
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withAssignors(Collections.singletonList(assignor))
             .withMetadataImage(new MetadataImageBuilder()
                 .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, 3)
-                .addRacks()
                 .build())
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
                     .setMemberEpoch(10)
                     .setPreviousMemberEpoch(9)
                     .setTargetMemberEpoch(10)
                     .setClientId("client")
                     .setClientHost("localhost/127.0.0.1")
-                    .setRebalanceTimeoutMs(5000)
-                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
-                    .setServerAssignorName("range")
-                    .setAssignedPartitions(mkAssignment(
-                        mkTopicAssignment(fooTopicId, 0, 1, 2),
-                        mkTopicAssignment(barTopicId, 0, 1)))
-                    .build())
-                .withMember(new ConsumerGroupMember.Builder(memberId2)
-                    .setMemberEpoch(10)
-                    .setPreviousMemberEpoch(9)
-                    .setTargetMemberEpoch(10)
-                    .setClientId("client")
-                    .setClientHost("localhost/127.0.0.1")
-                    .setRebalanceTimeoutMs(5000)
                     .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
                     .setServerAssignorName("range")
                     .setAssignedPartitions(mkAssignment(
-                        mkTopicAssignment(fooTopicId, 3, 4, 5),
-                        mkTopicAssignment(barTopicId, 2)))
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
                     .build())
                 .withAssignment(memberId1, mkAssignment(
-                    mkTopicAssignment(fooTopicId, 0, 1, 2),
-                    mkTopicAssignment(barTopicId, 0, 1)))
-                .withAssignment(memberId2, mkAssignment(
-                    mkTopicAssignment(fooTopicId, 3, 4, 5),
-                    mkTopicAssignment(barTopicId, 2)))
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
                 .withAssignmentEpoch(10))
             .build();
 
-        assignor.prepareGroupAssignment(new GroupAssignment(
-            new HashMap<String, MemberAssignment>() {
-                {
-                    put(memberId1, new MemberAssignment(mkAssignment(
-                        mkTopicAssignment(fooTopicId, 0, 1),
-                        mkTopicAssignment(barTopicId, 0)
-                    )));
-                    put(memberId2, new MemberAssignment(mkAssignment(
-                        mkTopicAssignment(fooTopicId, 2, 3),
-                        mkTopicAssignment(barTopicId, 1)
-                    )));
-                    put(memberId3, new MemberAssignment(mkAssignment(
-                        mkTopicAssignment(fooTopicId, 4, 5),
-                        mkTopicAssignment(barTopicId, 2)
-                    )));
-                }
-            }
-        ));
-
-        // Member 3 joins the consumer group.
-        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+        assertThrows(UnknownMemberIdException.class, () -> 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
-                .setMemberId(memberId3)
-                .setMemberEpoch(0)
+                .setMemberId(memberId1)
+                .setInstanceId("unknown-" + memberId1)
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
                 .setRebalanceTimeoutMs(5000)
                 .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
-                .setServerAssignor("range")
-                .setTopicPartitions(Collections.emptyList()));
-
-        assertResponseEquals(
-            new ConsumerGroupHeartbeatResponseData()
-                .setMemberId(memberId3)
-                .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000)
-                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
-            result.response()
-        );
-
-        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
-            .setMemberEpoch(11)
-            .setPreviousMemberEpoch(0)
-            .setTargetMemberEpoch(11)
-            .setClientId("client")
-            .setClientHost("localhost/127.0.0.1")
-            .setRebalanceTimeoutMs(5000)
-            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
-            .setServerAssignorName("range")
-            .setPartitionsPendingAssignment(mkAssignment(
-                mkTopicAssignment(fooTopicId, 4, 5),
-                mkTopicAssignment(barTopicId, 2)))
-            .build();
-
-        List<Record> expectedRecords = Arrays.asList(
-            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
-            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
-                {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
-                }
-            }),
-            RecordHelpers.newGroupEpochRecord(groupId, 11),
-            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
-                mkTopicAssignment(fooTopicId, 0, 1),
-                mkTopicAssignment(barTopicId, 0)
-            )),
-            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
-                mkTopicAssignment(fooTopicId, 2, 3),
-                mkTopicAssignment(barTopicId, 1)
-            )),
-            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
-                mkTopicAssignment(fooTopicId, 4, 5),
-                mkTopicAssignment(barTopicId, 2)
-            )),
-            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
-            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
-        );
-
-        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
-        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
-        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+                .setTopicPartitions(Collections.emptyList())));
     }
 
     @Test
-    public void testLeavingMemberBumpsGroupEpoch() {
+    public void 
testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferentMemberIdLeaves()
 {
         String groupId = "fooup";
         // Use a static member id as it makes the test easier.
         String memberId1 = Uuid.randomUuid().toString();
-        String memberId2 = Uuid.randomUuid().toString();
 
         Uuid fooTopicId = Uuid.randomUuid();
         String fooTopicName = "foo";
-        Uuid barTopicId = Uuid.randomUuid();
-        String barTopicName = "bar";
-        Uuid zarTopicId = Uuid.randomUuid();
-        String zarTopicName = "zar";
 
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
 
-        // Consumer group with two members.
+        // Consumer group with one static member.
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withAssignors(Collections.singletonList(assignor))
             .withMetadataImage(new MetadataImageBuilder()
                 .addTopic(fooTopicId, fooTopicName, 6)
-                .addTopic(barTopicId, barTopicName, 3)
-                .addTopic(zarTopicId, zarTopicName, 1)
-                .addRacks()
                 .build())
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
                     .setMemberEpoch(10)
                     .setPreviousMemberEpoch(9)
                     .setTargetMemberEpoch(10)
@@ -1860,63 +2746,22 @@ public class GroupMetadataManagerTest {
                     .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
                     .setServerAssignorName("range")
                     .setAssignedPartitions(mkAssignment(
-                        mkTopicAssignment(fooTopicId, 0, 1, 2),
-                        mkTopicAssignment(barTopicId, 0, 1)))
-                    .build())
-                .withMember(new ConsumerGroupMember.Builder(memberId2)
-                    .setMemberEpoch(10)
-                    .setPreviousMemberEpoch(9)
-                    .setTargetMemberEpoch(10)
-                    .setClientId("client")
-                    .setClientHost("localhost/127.0.0.1")
-                    // Use zar only here to ensure that metadata needs to be 
recomputed.
-                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
-                    .setServerAssignorName("range")
-                    .setAssignedPartitions(mkAssignment(
-                        mkTopicAssignment(fooTopicId, 3, 4, 5),
-                        mkTopicAssignment(barTopicId, 2)))
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
                     .build())
                 .withAssignment(memberId1, mkAssignment(
-                    mkTopicAssignment(fooTopicId, 0, 1, 2),
-                    mkTopicAssignment(barTopicId, 0, 1)))
-                .withAssignment(memberId2, mkAssignment(
-                    mkTopicAssignment(fooTopicId, 3, 4, 5),
-                    mkTopicAssignment(barTopicId, 2)))
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
                 .withAssignmentEpoch(10))
             .build();
 
-        // Member 2 leaves the consumer group.
-        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+        assertThrows(FencedInstanceIdException.class, () -> 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
-                .setMemberId(memberId2)
-                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setMemberId("unknown-" + memberId1)
+                .setInstanceId(memberId1)
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
                 .setRebalanceTimeoutMs(5000)
                 .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
-                .setTopicPartitions(Collections.emptyList()));
-
-        assertResponseEquals(
-            new ConsumerGroupHeartbeatResponseData()
-                .setMemberId(memberId2)
-                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
-            result.response()
-        );
-
-        List<Record> expectedRecords = Arrays.asList(
-            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
-            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
-            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
-            // Subscription metadata is recomputed because zar is no longer 
there.
-            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
-                {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
-                }
-            }),
-            RecordHelpers.newGroupEpochRecord(groupId, 11)
-        );
-
-        assertRecordsEquals(expectedRecords, result.records());
+                .setTopicPartitions(Collections.emptyList())));
     }
 
     @Test
@@ -3328,6 +4173,87 @@ public class GroupMetadataManagerTest {
         context.assertNoRevocationTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testSessionTimeoutExpirationStaticMember() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setInstanceId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Static member sends a temporary leave group request
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setInstanceId(memberId)
+                .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(90000)
+                .setSubscribedTopicNames(Collections.singletonList("foo"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertEquals(-2, result.response().memberEpoch());
+
+        // Verify that there is still a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time past the session timeout. No static member joined back 
as a replacement
+        List<ExpiredTimeout<Void, Record>> timeouts = context.sleep(45000 + 1);
+
+        // Verify the expired timeout.
+        assertEquals(
+            Collections.singletonList(new ExpiredTimeout<Void, Record>(
+                consumerGroupSessionTimeoutKey(groupId, memberId),
+                new CoordinatorResult<>(
+                    Arrays.asList(
+                        
RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId),
+                        
RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId),
+                        
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId),
+                        
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, 
Collections.emptyMap()),
+                        RecordHelpers.newGroupEpochRecord(groupId, 2)
+                    )
+                )
+            )),
+            timeouts
+        );
+
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId);
+        context.assertNoRevocationTimeout(groupId, memberId);
+    }
+
     @Test
     public void testRevocationTimeoutLifecycle() {
         String groupId = "fooup";
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index c9379fc2e7b..75016cb6650 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -48,6 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.Mockito.mock;
 
 public class ConsumerGroupTest {
@@ -94,6 +95,34 @@ public class ConsumerGroupTest {
         assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
     }
 
+    @Test
+    public void testNoStaticMember() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        // Create a new member which is not static
+        consumerGroup.getOrMaybeCreateMember("member", true);
+        assertNull(consumerGroup.staticMember("instance-id"));
+    }
+
+    @Test
+    public void testGetStaticMemberByInstanceId() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+        ConsumerGroupMember member;
+
+        member = consumerGroup.getOrMaybeCreateMember("member", true);
+
+        member = new ConsumerGroupMember.Builder(member)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setInstanceId("instance")
+            .build();
+
+        consumerGroup.updateMember(member);
+
+        assertEquals(member, consumerGroup.staticMember("instance"));
+        assertEquals(member, consumerGroup.getOrMaybeCreateMember("member", 
false));
+        assertEquals(member.memberId(), 
consumerGroup.staticMemberId("instance"));
+    }
+
     @Test
     public void testRemoveMember() {
         ConsumerGroup consumerGroup = createConsumerGroup("foo");
@@ -106,6 +135,27 @@ public class ConsumerGroupTest {
 
     }
 
+    @Test
+    public void testRemoveStaticMember() {
+        ConsumerGroup consumerGroup = createConsumerGroup("foo");
+
+        ConsumerGroupMember member;
+        member = consumerGroup.getOrMaybeCreateMember("member", true);
+        assertTrue(consumerGroup.hasMember("member"));
+
+        member = new ConsumerGroupMember.Builder(member)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setInstanceId("instance")
+            .build();
+
+        consumerGroup.updateMember(member);
+
+        consumerGroup.removeMember("member");
+        assertFalse(consumerGroup.hasMember("member"));
+        assertNull(consumerGroup.staticMember("instance"));
+        assertNull(consumerGroup.staticMemberId("instance"));
+    }
+
     @Test
     public void testUpdatingMemberUpdatesPartitionEpoch() {
         Uuid fooTopicId = Uuid.randomUuid();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
index 810b7617a23..bdd7a354ca4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
@@ -57,6 +57,7 @@ public class TargetAssignmentBuilderTest {
         private final Map<String, ConsumerGroupMember> updatedMembers = new 
HashMap<>();
         private final Map<String, Assignment> targetAssignment = new 
HashMap<>();
         private final Map<String, MemberAssignment> memberAssignments = new 
HashMap<>();
+        private final Map<String, String> staticMembers = new HashMap<>();
 
         public TargetAssignmentBuilderTestContext(
             String groupId,
@@ -71,10 +72,23 @@ public class TargetAssignmentBuilderTest {
             List<String> subscriptions,
             Map<Uuid, Set<Integer>> targetPartitions
         ) {
-            members.put(memberId, new ConsumerGroupMember.Builder(memberId)
-                .setSubscribedTopicNames(subscriptions)
-                .build());
+            addGroupMember(memberId, null, subscriptions, targetPartitions);
+        }
+
+        private void addGroupMember(
+            String memberId,
+            String instanceId,
+            List<String> subscriptions,
+            Map<Uuid, Set<Integer>> targetPartitions
+        ) {
+            ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+                .setSubscribedTopicNames(subscriptions);
 
+            if (instanceId != null) {
+                memberBuilder.setInstanceId(instanceId);
+                staticMembers.put(instanceId, memberId);
+            }
+            members.put(memberId, memberBuilder.build());
             targetAssignment.put(memberId, new Assignment(
                 (byte) 0,
                 targetPartitions,
@@ -131,8 +145,18 @@ public class TargetAssignmentBuilderTest {
 
         public void removeMemberSubscription(
             String memberId
+        ) {
+            removeMemberSubscription(memberId, null);
+        }
+
+        public void removeMemberSubscription(
+            String memberId,
+            String instanceId
         ) {
             this.updatedMembers.put(memberId, null);
+            if (instanceId != null) {
+                this.staticMembers.remove(instanceId);
+            }
         }
 
         public void prepareMemberAssignment(
@@ -161,9 +185,17 @@ public class TargetAssignmentBuilderTest {
                 if (updatedMemberOrNull == null) {
                     memberSpecs.remove(memberId);
                 } else {
+                    ConsumerGroupMember member = members.get(memberId);
+                    Assignment assignment;
+                    // A new static member joins and needs to replace an 
existing departed one.
+                    if (member == null && 
staticMembers.containsKey(updatedMemberOrNull.instanceId())) {
+                        assignment = 
targetAssignment.getOrDefault(staticMembers.get(updatedMemberOrNull.instanceId()),
 Assignment.EMPTY);
+                    } else {
+                        assignment = targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY);
+                    }
                     memberSpecs.put(memberId, createAssignmentMemberSpec(
                         updatedMemberOrNull,
-                        targetAssignment.getOrDefault(memberId, 
Assignment.EMPTY),
+                        assignment,
                         subscriptionMetadata
                     ));
                 }
@@ -189,6 +221,7 @@ public class TargetAssignmentBuilderTest {
             // Create and populate the assignment builder.
             TargetAssignmentBuilder builder = new 
TargetAssignmentBuilder(groupId, groupEpoch, assignor)
                 .withMembers(members)
+                .withStaticMembers(staticMembers)
                 .withSubscriptionMetadata(subscriptionMetadata)
                 .withTargetAssignment(targetAssignment);
 
@@ -691,6 +724,85 @@ public class TargetAssignmentBuilderTest {
         assertEquals(expectedAssignment, result.targetAssignment());
     }
 
+    @Test
+    public void testStaticMemberReplace() {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
+        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+
+        context.addGroupMember("member-1", "member-1", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2),
+            mkTopicAssignment(barTopicId, 1, 2)
+        ));
+
+        context.addGroupMember("member-2", "member-2", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 3, 4),
+            mkTopicAssignment(barTopicId, 3, 4)
+        ));
+
+        context.addGroupMember("member-3", "member-3", Arrays.asList("foo", 
"bar", "zar"), mkAssignment(
+            mkTopicAssignment(fooTopicId, 5, 6),
+            mkTopicAssignment(barTopicId, 5, 6)
+        ));
+
+        // Static member 3 leaves
+        context.removeMemberSubscription("member-3", "member-3");
+
+        // Another static member joins with the same instance id as the 
departed one
+        context.updateMemberSubscription("member-3-a", Arrays.asList("foo", 
"bar", "zar"), Optional.of("member-3"), Optional.empty());
+
+        context.prepareMemberAssignment("member-1", mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2),
+            mkTopicAssignment(barTopicId, 1, 2)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkAssignment(
+            mkTopicAssignment(fooTopicId, 3, 4),
+            mkTopicAssignment(barTopicId, 3, 4)
+        ));
+
+        context.prepareMemberAssignment("member-3-a", mkAssignment(
+            mkTopicAssignment(fooTopicId, 5, 6),
+            mkTopicAssignment(barTopicId, 5, 6)
+        ));
+
+        TargetAssignmentBuilder.TargetAssignmentResult result = 
context.build();
+
+        assertEquals(2, result.records().size());
+
+        assertUnorderedList(Collections.singletonList(
+            newTargetAssignmentRecord("my-group", "member-3-a", mkAssignment(
+                mkTopicAssignment(fooTopicId, 5, 6),
+                mkTopicAssignment(barTopicId, 5, 6)
+            ))
+        ), result.records().subList(0, 1));
+
+        assertEquals(newTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(1));
+
+        Map<String, Assignment> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", new Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 1, 2),
+            mkTopicAssignment(barTopicId, 1, 2)
+        )));
+        expectedAssignment.put("member-2", new Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 3, 4),
+            mkTopicAssignment(barTopicId, 3, 4)
+        )));
+
+        expectedAssignment.put("member-3-a", new Assignment(mkAssignment(
+            mkTopicAssignment(fooTopicId, 5, 6),
+            mkTopicAssignment(barTopicId, 5, 6)
+        )));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
     private static <T> void assertUnorderedList(
         List<T> expected,
         List<T> actual

Reply via email to