This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9b4480292741a47fc4a2e5c9f639a634918303c7
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Jun 7 15:34:07 2024 +0200

    Add streams group member
    
    See https://github.com/lucasbru/kafka/pull/19
---
 .../org/apache/kafka/coordinator/group/Group.java  |    1 +
 .../group/GroupCoordinatorRecordHelpers.java       |  162 +++
 .../coordinator/group/streams/StreamsGroup.java    | 1104 ++++++++++++++++++++
 .../group/streams/StreamsGroupMember.java          |  704 +++++++++++++
 .../group/streams/SubscribedTopicMetadata.java     |   98 ++
 .../StreamsGroupCurrentMemberAssignmentValue.json  |    2 +-
 .../group/streams/StreamsGroupMemberTest.java      |  298 ++++++
 .../group/streams/StreamsGroupTest.java            |  730 +++++++++++++
 8 files changed, 3098 insertions(+), 1 deletion(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index b5d63499751..79c1b72237b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -37,6 +37,7 @@ public interface Group {
         CONSUMER("consumer"),
         CLASSIC("classic"),
         SHARE("share"),
+        STREAMS("streams"),
         UNKNOWN("unknown");
 
         private final String name;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 98cf7f937e8..ea657361480 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -52,12 +52,23 @@ import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMe
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 
@@ -112,6 +123,35 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
+    public static CoordinatorRecord newStreamsGroupMemberRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return new CoordinatorRecord(
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataKey()
+                    .setGroupId(groupId)
+                    .setMemberId(member.memberId()),
+                (short) 11
+            ),
+            new ApiMessageAndVersion(
+                new StreamsGroupMemberMetadataValue()
+                    .setRackId(member.rackId())
+                    .setInstanceId(member.instanceId())
+                    .setClientId(member.clientId())
+                    .setClientHost(member.clientHost())
+                    .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                    .setTopologyHash(member.topologyHash())
+                    .setProcessId(member.processId())
+                    .setHostInfo(member.hostInfo())
+                    .setClientTags(member.clientTags())
+                    .setUserData(member.userData())
+                    .setAssignmentConfigs(member.assignmentConfigs()),
+               (short) 0
+            )
+        );
+    }
+
     /**
      * Creates a ConsumerGroupMemberMetadata tombstone.
      *
@@ -211,6 +251,24 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
+    public static CoordinatorRecord newStreamsGroupEpochRecord(
+        String groupId,
+        int newGroupEpoch
+    ) {
+        return new CoordinatorRecord(
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataKey()
+                    .setGroupId(groupId),
+                (short) 9
+            ),
+            new ApiMessageAndVersion(
+                new StreamsGroupMetadataValue()
+                    .setEpoch(newGroupEpoch),
+                (short) 0
+            )
+        );
+    }
+
     /**
      * Creates a ConsumerGroupMetadata tombstone.
      *
@@ -269,6 +327,55 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
+    public static CoordinatorRecord newTargetAssignmentRecord(
+        String groupId,
+        String memberId,
+        Map<String, Set<Integer>> activeTasks,
+        Map<String, Set<Integer>> standbyTasks,
+        Map<String, Set<Integer>> warmupTasks
+    ) {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTaskIds = 
new ArrayList<>(activeTasks.size());
+        for (Map.Entry<String, Set<Integer>> entry : activeTasks.entrySet()) {
+            activeTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopology(entry.getKey())
+                    .setPartitions(new ArrayList<>(entry.getValue()))
+            );
+        }
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTaskIds = 
new ArrayList<>(standbyTasks.size());
+        for (Map.Entry<String, Set<Integer>> entry : standbyTasks.entrySet()) {
+            standbyTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopology(entry.getKey())
+                    .setPartitions(new ArrayList<>(entry.getValue()))
+            );
+        }
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTaskIds = 
new ArrayList<>(warmupTasks.size());
+        for (Map.Entry<String, Set<Integer>> entry : warmupTasks.entrySet()) {
+            warmupTaskIds.add(
+                new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+                    .setSubtopology(entry.getKey())
+                    .setPartitions(new ArrayList<>(entry.getValue()))
+            );
+        }
+
+        return new CoordinatorRecord(
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberKey()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId),
+                (short) 13
+            ),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMemberValue()
+                    .setActiveTasks(activeTaskIds)
+                    .setStandbyTasks(standbyTaskIds)
+                    .setWarmupTasks(warmupTaskIds),
+                (short) 0
+            )
+        );
+    }
+
     /**
      * Creates a ConsumerGroupTargetAssignmentMember tombstone.
      *
@@ -316,6 +423,24 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
+    public static CoordinatorRecord newStreamsTargetAssignmentEpochRecord(
+        String groupId,
+        int assignmentEpoch
+    ) {
+        return new CoordinatorRecord(
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataKey()
+                    .setGroupId(groupId),
+                (short) 12
+            ),
+            new ApiMessageAndVersion(
+                new StreamsGroupTargetAssignmentMetadataValue()
+                    .setAssignmentEpoch(assignmentEpoch),
+                (short) 0
+            )
+        );
+    }
+
     /**
      * Creates a ConsumerGroupTargetAssignmentMetadata tombstone.
      *
@@ -365,6 +490,31 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
+    public static CoordinatorRecord newCurrentAssignmentRecord(
+        String groupId,
+        StreamsGroupMember member
+    ) {
+        return new CoordinatorRecord(
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentKey()
+                    .setGroupId(groupId)
+                    .setMemberId(member.memberId()),
+                (short) 14
+            ),
+            new ApiMessageAndVersion(
+                new StreamsGroupCurrentMemberAssignmentValue()
+                    .setMemberEpoch(member.memberEpoch())
+                    .setPreviousMemberEpoch(member.previousMemberEpoch())
+                    .setState(member.state().value())
+                    .setActiveTasks(toTaskIds(member.assignedActiveTasks()))
+                    .setStandbyTasks(toTaskIds(member.assignedStandbyTasks()))
+                    .setWarmupTasks(toTaskIds(member.assignedWarmupTasks()))
+                    
.setActiveTasksPendingRevocation(toTaskIds(member.activeTasksPendingRevocation())),
+                (short) 0
+            )
+        );
+    }
+
     /**
      * Creates a ConsumerGroupCurrentMemberAssignment tombstone.
      *
@@ -943,6 +1093,18 @@ public class GroupCoordinatorRecordHelpers {
         return topics;
     }
 
+    private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
toTaskIds(
+        Map<String, Set<Integer>> tasks
+    ) {
+        List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new 
ArrayList<>(tasks.size());
+        tasks.forEach((subtopologyId, partitions) ->
+            taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subtopologyId)
+                .setPartitions(new ArrayList<>(partitions)))
+        );
+        return taskIds;
+    }
+
     /**
      * Creates a StreamsTopology record.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
new file mode 100644
index 00000000000..d28e9218994
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -0,0 +1,1104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.coordinator.group.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
+import org.apache.kafka.timeline.TimelineObject;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS;
+import static 
org.apache.kafka.coordinator.group.assignor.SubscriptionType.HOMOGENEOUS;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.EMPTY;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.RECONCILING;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.STABLE;
+
+/**
+ * A Streams Group. All the metadata in this class are backed by
+ * records in the __consumer_offsets partitions.
+ */
+public class StreamsGroup implements Group {
+
+    public enum StreamsGroupState {
+        EMPTY("Empty"),
+        ASSIGNING("Assigning"),
+        RECONCILING("Reconciling"),
+        STABLE("Stable"),
+        DEAD("Dead");
+
+        private final String name;
+
+        private final String lowerCaseName;
+
+        StreamsGroupState(String name) {
+            this.name = name;
+            this.lowerCaseName = name.toLowerCase(Locale.ROOT);
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        public String toLowerCaseString() {
+            return lowerCaseName;
+        }
+    }
+
+    public static class DeadlineAndEpoch {
+        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+        public final long deadlineMs;
+        public final int epoch;
+
+        DeadlineAndEpoch(long deadlineMs, int epoch) {
+            this.deadlineMs = deadlineMs;
+            this.epoch = epoch;
+        }
+    }
+
+    /**
+     * The snapshot registry.
+     */
+    private final SnapshotRegistry snapshotRegistry;
+
+    /**
+     * The group id.
+     */
+    private final String groupId;
+
+    /**
+     * The group state.
+     */
+    private final TimelineObject<StreamsGroupState> state;
+
+    /**
+     * The group epoch. The epoch is incremented whenever the subscriptions
+     * are updated and it will trigger the computation of a new assignment
+     * for the group.
+     */
+    private final TimelineInteger groupEpoch;
+
+    /**
+     * The group members.
+     */
+    private final TimelineHashMap<String, StreamsGroupMember> members;
+
+    /**
+     * The static group members.
+     */
+    private final TimelineHashMap<String, String> staticMembers;
+
+    /**
+     * The number of members supporting each assignor name.
+     */
+    private final TimelineHashMap<String, Integer> assignors;
+
+    /**
+     * The target assignment epoch. An assignment epoch smaller than the group 
epoch
+     * means that a new assignment is required. The assignment epoch is 
updated when
+     * a new assignment is installed.
+     */
+    private final TimelineInteger targetAssignmentEpoch;
+
+    /**
+     * The target assignment per member id.
+     */
+    private final TimelineHashMap<String, Assignment> targetAssignment;
+
+    /**
+     * Reverse lookup map representing tasks with
+     * their current member assignments.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetActiveTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetStandbyTasksAssignment;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetWarmupTasksAssignment;
+
+    /**
+     * The current partition epoch maps each topic-partitions to their current 
epoch where
+     * the epoch is the epoch of their owners. When a member revokes a 
partition, it removes
+     * its epochs from this map. When a member gets a partition, it adds its 
epochs to this map.
+     */
+    private final TimelineHashMap<String, TimelineHashMap<Integer, Integer>> 
currentActiveTasksEpoch;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, Integer>> 
currentStandbyTasksEpoch;
+    private final TimelineHashMap<String, TimelineHashMap<Integer, Integer>> 
currentWarmupTasksEpoch;
+
+    /**
+     * The coordinator metrics.
+     */
+    private final GroupCoordinatorMetricsShard metrics;
+
+    /**
+     * The metadata refresh deadline. It consists of a timestamp in 
milliseconds together with
+     * the group epoch at the time of setting it. The metadata refresh time is 
considered as a
+     * soft state (read that it is not stored in a timeline data structure). 
It is like this
+     * because it is not persisted to the log. The group epoch is here to 
ensure that the
+     * metadata refresh deadline is invalidated if the group epoch does not 
correspond to
+     * the current group epoch. This can happen if the metadata refresh 
deadline is updated
+     * after having refreshed the metadata but the write operation failed. In 
this case, the
+     * time is not automatically rolled back.
+     */
+    private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
+    public StreamsGroup(
+        SnapshotRegistry snapshotRegistry,
+        String groupId,
+        GroupCoordinatorMetricsShard metrics
+    ) {
+        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
+        this.groupId = Objects.requireNonNull(groupId);
+        this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+        this.groupEpoch = new TimelineInteger(snapshotRegistry);
+        this.members = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.assignors = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
+        this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetActiveTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetStandbyTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.invertedTargetWarmupTasksAssignment = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentActiveTasksEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+        this.currentStandbyTasksEpoch = new 
TimelineHashMap<>(snapshotRegistry, 0);
+        this.currentWarmupTasksEpoch = new TimelineHashMap<>(snapshotRegistry, 
0);
+        this.metrics = Objects.requireNonNull(metrics);
+    }
+
+    /**
+     * @return The group type (Streams).
+     */
+    @Override
+    public GroupType type() {
+        return GroupType.STREAMS;
+    }
+
+    /**
+     * @return The current state as a String.
+     */
+    @Override
+    public String stateAsString() {
+        return state.get().toString();
+    }
+
+    /**
+     * @return The current state as a String with given committedOffset.
+     */
+    public String stateAsString(long committedOffset) {
+        return state.get(committedOffset).toString();
+    }
+
+    /**
+     * @return the group formatted as a list group response based on the 
committed offset.
+     */
+    public ListGroupsResponseData.ListedGroup asListedGroup(long 
committedOffset) {
+        return new ListGroupsResponseData.ListedGroup()
+            .setGroupId(groupId)
+            .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .setGroupState(state.get(committedOffset).toString())
+            .setGroupType(type().toString());
+    }
+
+    /**
+     * @return The group id.
+     */
+    @Override
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public StreamsGroupState state() {
+        return state.get();
+    }
+
+    /**
+     * @return The current state based on committed offset.
+     */
+    public StreamsGroupState state(long committedOffset) {
+        return state.get(committedOffset);
+    }
+
+    /**
+     * @return The group epoch.
+     */
+    public int groupEpoch() {
+        return groupEpoch.get();
+    }
+
+    /**
+     * Sets the group epoch.
+     *
+     * @param groupEpoch The new group epoch.
+     */
+    public void setGroupEpoch(int groupEpoch) {
+        this.groupEpoch.set(groupEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * @return The target assignment epoch.
+     */
+    public int assignmentEpoch() {
+        return targetAssignmentEpoch.get();
+    }
+
+    /**
+     * Sets the assignment epoch.
+     *
+     * @param targetAssignmentEpoch The new assignment epoch.
+     */
+    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
+        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Get member id of a static member that matches the given group
+     * instance id.
+     *
+     * @param groupInstanceId The group instance id.
+     *
+     * @return The member id corresponding to the given instance id or null if 
it does not exist
+     */
+    public String staticMemberId(String groupInstanceId) {
+        return staticMembers.get(groupInstanceId);
+    }
+
+    /**
+     * Gets or creates a new member but without adding it to the group. Adding 
a member
+     * is done via the {@link StreamsGroup#updateMember(StreamsGroupMember)} 
method.
+     *
+     * @param memberId          The member id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A StreamsGroupMember.
+     */
+    public StreamsGroupMember getOrMaybeCreateMember(
+        String memberId,
+        boolean createIfNotExists
+    ) {
+        StreamsGroupMember member = members.get(memberId);
+        if (member != null) return member;
+
+        if (!createIfNotExists) {
+            throw new UnknownMemberIdException(
+                String.format("Member %s is not a member of group %s.", 
memberId, groupId)
+            );
+        }
+
+        return new StreamsGroupMember.Builder(memberId).build();
+    }
+
+    /**
+     * Gets a static member.
+     *
+     * @param instanceId The group instance id.
+     *
+     * @return The member corresponding to the given instance id or null if it 
does not exist
+     */
+    public StreamsGroupMember staticMember(String instanceId) {
+        String existingMemberId = staticMemberId(instanceId);
+        return existingMemberId == null ? null : 
getOrMaybeCreateMember(existingMemberId, false);
+    }
+
+    /**
+     * Adds or updates the member.
+     *
+     * @param newMember The new member state.
+     */
+    public void updateMember(StreamsGroupMember newMember) {
+        if (newMember == null) {
+            throw new IllegalArgumentException("newMember cannot be null.");
+        }
+        StreamsGroupMember oldMember = members.put(newMember.memberId(), 
newMember);
+        maybeUpdateTaskEpoch(oldMember, newMember);
+        updateStaticMember(newMember);
+        maybeUpdateGroupState();
+        maybeUpdateAssignors(oldMember, newMember);
+    }
+
+    /**
+     * Updates the member id stored against the instance id if the member is a 
static member.
+     *
+     * @param newMember The new member state.
+     */
+    private void updateStaticMember(StreamsGroupMember newMember) {
+        if (newMember.instanceId() != null) {
+            staticMembers.put(newMember.instanceId(), newMember.memberId());
+        }
+    }
+
+    /**
+     * Remove the member from the group.
+     *
+     * @param memberId The member id to remove.
+     */
+    public void removeMember(String memberId) {
+        StreamsGroupMember oldMember = members.remove(memberId);
+        maybeRemoveTaskEpoch(oldMember);
+        removeStaticMember(oldMember);
+        maybeUpdateGroupState();
+    }
+
+    /**
+     * Remove the static member mapping if the removed member is static.
+     *
+     * @param oldMember The member to remove.
+     */
+    private void removeStaticMember(StreamsGroupMember oldMember) {
+        if (oldMember.instanceId() != null) {
+            staticMembers.remove(oldMember.instanceId());
+        }
+    }
+
+    /**
+     * Returns true if the member exists.
+     *
+     * @param memberId The member id.
+     *
+     * @return A boolean indicating whether the member exists or not.
+     */
+    public boolean hasMember(String memberId) {
+        return members.containsKey(memberId);
+    }
+
+    /**
+     * @return The number of members.
+     */
+    public int numMembers() {
+        return members.size();
+    }
+
+    /**
+     * @return An immutable Map containing all the members keyed by their id.
+     */
+    public Map<String, StreamsGroupMember> members() {
+        return Collections.unmodifiableMap(members);
+    }
+
+    /**
+     * @return An immutable Map containing all the static members keyed by 
instance id.
+     */
+    public Map<String, String> staticMembers() {
+        return Collections.unmodifiableMap(staticMembers);
+    }
+
+    /**
+     * Returns the target assignment of the member.
+     *
+     * @return The StreamsGroupMemberAssignment or an EMPTY one if it does not
+     *         exist.
+     */
+    public Assignment targetAssignment(String memberId) {
+        return targetAssignment.getOrDefault(memberId, Assignment.EMPTY);
+    }
+
+    /**
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<String, Map<Integer, String>> 
invertedTargetActiveTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetActiveTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetStandbyTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetStandbyTasksAssignment);
+    }
+
+    public Map<String, Map<Integer, String>> 
invertedTargetWarmupTasksAssignment() {
+        return 
Collections.unmodifiableMap(invertedTargetWarmupTasksAssignment);
+    }
+
+    /**
+     * Updates the server assignors count.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateAssignors(
+        StreamsGroupMember oldMember,
+        StreamsGroupMember newMember
+    ) {
+        maybeUpdateAssignors(assignors, oldMember, newMember);
+    }
+
+    private static void maybeUpdateAssignors(
+        Map<String, Integer> serverAssignorCount,
+        StreamsGroupMember oldMember,
+        StreamsGroupMember newMember
+    ) {
+        if (oldMember != null) {
+            oldMember.assignor().ifPresent(name ->
+                serverAssignorCount.compute(name, StreamsGroup::decValue)
+            );
+        }
+        if (newMember != null) {
+            newMember.assignor().ifPresent(name ->
+                serverAssignorCount.compute(name, StreamsGroup::incValue)
+            );
+        }
+    }
+
+    /**
+     * Updates the target assignment of a member.
+     *
+     * @param memberId              The member id.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new Assignment(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new Assignment(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new Assignment(emptyMap(), 
emptyMap(), emptyMap())),
+            newTargetAssignment
+        );
+        targetAssignment.put(memberId, newTargetAssignment);
+    }
+
+
+    private void updateInvertedTargetActiveTasksAssignment(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetActiveTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetStandbyTasksAssignment(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetStandbyTasksAssignment
+        );
+    }
+
+    private void updateInvertedTargetWarmupTasksAssignment(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        updateInvertedTargetAssignment(
+            memberId,
+            oldTargetAssignment,
+            newTargetAssignment,
+            invertedTargetWarmupTasksAssignment
+        );
+    }
+
+    /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, String>> 
invertedTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<String> allSubtopologyIds = new HashSet<>();
+        allSubtopologyIds.addAll(oldTargetAssignment.activeTasks().keySet());
+        allSubtopologyIds.addAll(newTargetAssignment.activeTasks().keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.activeTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+
+            TimelineHashMap<Integer, String> taskPartitionAssignment = 
invertedTargetAssignment.computeIfAbsent(
+                subtopologyId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment 
only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was 
updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action 
when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(taskPartitionAssignment.get(partition))) {
+                    taskPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in the new assignment but not in the 
old assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    taskPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (taskPartitionAssignment.isEmpty()) {
+                invertedTargetAssignment.remove(subtopologyId);
+            } else {
+                invertedTargetAssignment.put(subtopologyId, 
taskPartitionAssignment);
+            }
+        }
+    }
+
+    /**
+     * Removes the target assignment of a member.
+     *
+     * @param memberId The member id.
+     */
+    public void removeTargetAssignment(String memberId) {
+        updateInvertedTargetActiveTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+            Assignment.EMPTY
+        );
+        updateInvertedTargetStandbyTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+            Assignment.EMPTY
+        );
+        updateInvertedTargetWarmupTasksAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
+            Assignment.EMPTY
+        );
+        targetAssignment.remove(memberId);
+    }
+
+    /**
+     * @return An immutable Map containing all the target assignment keyed by 
member id.
+     */
+    public Map<String, Assignment> targetAssignment() {
+        return Collections.unmodifiableMap(targetAssignment);
+    }
+
+    /**
+     * Returns the current epoch of a partition or -1 if the partition
+     * does not have one.
+     *
+     * @param topicId       The topic id.
+     * @param partitionId   The partition id.
+     *
+     * @return The epoch or -1.
+     */
+    public int currentActiveTaskEpoch(
+        String topicId, int partitionId
+    ) {
+        Map<Integer, Integer> partitions = 
currentActiveTasksEpoch.get(topicId);
+        if (partitions == null) {
+            return -1;
+        } else {
+            return partitions.getOrDefault(partitionId, -1);
+        }
+    }
+
+    /**
+     * Updates the metadata refresh deadline.
+     *
+     * @param deadlineMs The deadline in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setMetadataRefreshDeadline(
+        long deadlineMs,
+        int groupEpoch
+    ) {
+        this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, 
groupEpoch);
+    }
+
+    /**
+     * Requests a metadata refresh.
+     */
+    public void requestMetadataRefresh() {
+        this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two 
cases:
+     * 1) The deadline is smaller or equal to the current time;
+     * 2) The group epoch associated with the deadline is larger than
+     *    the current group epoch. This means that the operations which updated
+     *    the deadline failed.
+     *
+     * @param currentTimeMs The current time in milliseconds.
+     * @return A boolean indicating whether a refresh is required or not.
+     */
+    public boolean hasMetadataExpired(long currentTimeMs) {
+        return currentTimeMs >= metadataRefreshDeadline.deadlineMs || 
groupEpoch() < metadataRefreshDeadline.epoch;
+    }
+
+    /**
+     * @return The metadata refresh deadline.
+     */
+    public DeadlineAndEpoch metadataRefreshDeadline() {
+        return metadataRefreshDeadline;
+    }
+
+    /**
+     * Validates the OffsetCommit request.
+     *
+     * @param memberId          The member id.
+     * @param groupInstanceId   The group instance id.
+     * @param memberEpoch       The member epoch.
+     * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
+     *                          impact when a consumer group is used.
+     */
+    @Override
+    public void validateOffsetCommit(
+        String memberId,
+        String groupInstanceId,
+        int memberEpoch,
+        boolean isTransactional
+    ) throws UnknownMemberIdException, StaleMemberEpochException {
+        // When the member epoch is -1, the request comes from either the 
admin client
+        // or a consumer which does not use the group management facility. In 
this case,
+        // the request can commit offsets if the group is empty.
+        if (memberEpoch < 0 && members().isEmpty()) return;
+
+        final StreamsGroupMember member = getOrMaybeCreateMember(memberId, 
false);
+        validateMemberEpoch(memberEpoch, member.memberEpoch());
+    }
+
+    /**
+     * Validates the OffsetFetch request.
+     *
+     * @param memberId              The member id for consumer groups.
+     * @param memberEpoch           The member epoch for consumer groups.
+     * @param lastCommittedOffset   The last committed offsets in the timeline.
+     */
+    @Override
+    public void validateOffsetFetch(
+        String memberId,
+        int memberEpoch,
+        long lastCommittedOffset
+    ) throws UnknownMemberIdException, StaleMemberEpochException {
+        // When the member id is null and the member epoch is -1, the request 
either comes
+        // from the admin client or from a client which does not provide them. 
In this case,
+        // the fetch request is accepted.
+        if (memberId == null && memberEpoch < 0) return;
+
+        final StreamsGroupMember member = members.get(memberId, 
lastCommittedOffset);
+        if (member == null) {
+            throw new UnknownMemberIdException(String.format("Member %s is not 
a member of group %s.",
+                memberId, groupId));
+        }
+        validateMemberEpoch(memberEpoch, member.memberEpoch());
+    }
+
+    /**
+     * Validates the OffsetDelete request.
+     */
+    @Override
+    public void validateOffsetDelete() {}
+
+    /**
+     * Validates the DeleteGroups request.
+     */
+    @Override
+    public void validateDeleteGroup() throws ApiException {
+        if (state() != StreamsGroupState.EMPTY) {
+            throw Errors.NON_EMPTY_GROUP.exception();
+        }
+    }
+
+    @Override
+    public boolean isSubscribedToTopic(String topic) {
+        return false;
+    }
+
+    /**
+     * Populates the list of records with tombstone(s) for deleting the group.
+     *
+     * @param records The list of records.
+     */
+    @Override
+    public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
+        members().forEach((memberId, member) ->
+            
records.add(CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord(groupId(),
 memberId))
+        );
+
+        members().forEach((memberId, member) ->
+            
records.add(CoordinatorRecordHelpers.newTargetAssignmentTombstoneRecord(groupId(),
 memberId))
+        );
+        
records.add(CoordinatorRecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId()));
+
+        members().forEach((memberId, member) ->
+            
records.add(CoordinatorRecordHelpers.newMemberSubscriptionTombstoneRecord(groupId(),
 memberId))
+        );
+
+        
records.add(CoordinatorRecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId()));
+        
records.add(CoordinatorRecordHelpers.newGroupEpochTombstoneRecord(groupId()));
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return state() == StreamsGroupState.EMPTY;
+    }
+
+    /**
+     * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty if no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+        return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata 
-> offsetAndMetadata.commitTimestampMs));
+    }
+
+    @Override
+    public boolean isInStates(Set<String> statesFilter, long committedOffset) {
+        return 
statesFilter.contains(state.get(committedOffset).toLowerCaseString());
+    }
+
+    /**
+     * Throws a StaleMemberEpochException if the received member epoch does 
not match
+     * the expected member epoch.
+     */
+    private void validateMemberEpoch(
+        int receivedMemberEpoch,
+        int expectedMemberEpoch
+    ) throws StaleMemberEpochException {
+        if (receivedMemberEpoch != expectedMemberEpoch) {
+            throw new StaleMemberEpochException(String.format("The received 
member epoch %d does not match "
+                + "the expected member epoch %d.", receivedMemberEpoch, 
expectedMemberEpoch));
+        }
+    }
+
+    /**
+     * Updates the current state of the group.
+     */
+    private void maybeUpdateGroupState() {
+        StreamsGroupState previousState = state.get();
+        StreamsGroupState newState = STABLE;
+        if (members.isEmpty()) {
+            newState = EMPTY;
+        } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
+            newState = ASSIGNING;
+        } else {
+            for (StreamsGroupMember member : members.values()) {
+                if (!member.isReconciledTo(targetAssignmentEpoch.get())) {
+                    newState = RECONCILING;
+                    break;
+                }
+            }
+        }
+
+        state.set(newState);
+    }
+
+    /**
+     * Compute the subscription type of the consumer group.
+     *
+     * @param subscribedTopicNames      A map of topic names to the count of 
members subscribed to each topic.
+     *
+     * @return {@link 
org.apache.kafka.coordinator.group.assignor.SubscriptionType#HOMOGENEOUS} if 
all members are subscribed to exactly the same topics;
+     *         otherwise, {@link 
org.apache.kafka.coordinator.group.assignor.SubscriptionType#HETEROGENEOUS}.
+     */
+    public static SubscriptionType subscriptionType(
+        Map<String, Integer> subscribedTopicNames,
+        int numberOfMembers
+    ) {
+        if (subscribedTopicNames.isEmpty()) {
+            return HOMOGENEOUS;
+        }
+
+        for (int subscriberCount : subscribedTopicNames.values()) {
+            if (subscriberCount != numberOfMembers) {
+                return HETEROGENEOUS;
+            }
+        }
+        return HOMOGENEOUS;
+    }
+
+    /**
+     * Updates the tasks epochs based on the old and the new member.
+     *
+     * @param oldMember The old member.
+     * @param newMember The new member.
+     */
+    private void maybeUpdateTaskEpoch(
+        StreamsGroupMember oldMember,
+        StreamsGroupMember newMember
+    ) {
+        maybeRemoveTaskEpoch(oldMember);
+        addTaskEpochs(
+            newMember.assignedActiveTasks(),
+            newMember.assignedStandbyTasks(),
+            newMember.assignedWarmupTasks(),
+            newMember.memberEpoch()
+        );
+        addTaskEpochs(
+            newMember.activeTasksPendingRevocation(),
+            emptyMap(),
+            emptyMap(),
+            newMember.memberEpoch()
+        );
+    }
+
+    /**
+     * Removes the task epochs for the provided member.
+     *
+     * @param oldMember The old member.
+     */
+    private void maybeRemoveTaskEpoch(
+        StreamsGroupMember oldMember
+    ) {
+        if (oldMember != null) {
+            removeActiveTaskEpochs(oldMember.assignedActiveTasks(), 
oldMember.memberEpoch());
+            removeStandbyTaskEpochs(oldMember.assignedStandbyTasks(), 
oldMember.memberEpoch());
+            removeWarmupTaskEpochs(oldMember.assignedWarmupTasks(), 
oldMember.memberEpoch());
+            removeActiveTaskEpochs(oldMember.activeTasksPendingRevocation(), 
oldMember.memberEpoch());
+        }
+    }
+
+    void removeActiveTaskEpochs(
+        Map<String, Set<Integer>> assignment,
+        int expectedEpoch
+    ) {
+        removeTaskEpochs(assignment, currentActiveTasksEpoch, expectedEpoch);
+    }
+
+    void removeStandbyTaskEpochs(
+        Map<String, Set<Integer>> assignment,
+        int expectedEpoch
+    ) {
+        removeTaskEpochs(assignment, currentStandbyTasksEpoch, expectedEpoch);
+    }
+
+    void removeWarmupTaskEpochs(
+        Map<String, Set<Integer>> assignment,
+        int expectedEpoch
+    ) {
+        removeTaskEpochs(assignment, currentWarmupTasksEpoch, expectedEpoch);
+    }
+
+    /**
+     * Removes the task epochs based on the provided assignment.
+     *
+     * @param assignment    The assignment.
+     * @param expectedEpoch The expected epoch.
+     * @throws IllegalStateException if the epoch does not match the expected 
one.
+     * package-private for testing.
+     */
+    private void removeTaskEpochs(
+        Map<String, Set<Integer>> assignment,
+        TimelineHashMap<String, TimelineHashMap<Integer, Integer>> 
currentTasksEpoch,
+        int expectedEpoch
+    ) {
+        assignment.forEach((subtopologyId, assignedPartitions) -> {
+            currentTasksEpoch.compute(subtopologyId, (__, partitionsOrNull) -> 
{
+                if (partitionsOrNull != null) {
+                    assignedPartitions.forEach(partitionId -> {
+                        Integer prevValue = 
partitionsOrNull.remove(partitionId);
+                        if (prevValue != expectedEpoch) {
+                            throw new IllegalStateException(
+                                String.format("Cannot remove the epoch %d from 
task %s_%s because the partition is " +
+                                    "still owned at a different epoch %d", 
expectedEpoch, subtopologyId, partitionId, prevValue));
+                        }
+                    });
+                    if (partitionsOrNull.isEmpty()) {
+                        return null;
+                    } else {
+                        return partitionsOrNull;
+                    }
+                } else {
+                    throw new IllegalStateException(
+                        String.format("Cannot remove the epoch %d from %s 
because it does not have any epoch",
+                            expectedEpoch, subtopologyId));
+                }
+            });
+        });
+    }
+
+    /**
+     * Adds the partitions epoch based on the provided assignment.
+     *
+     * @param activeTasks   The assigned active tasks.
+     * @param standbyTasks  The assigned standby tasks.
+     * @param warmupTasks   The assigned warmup tasks.
+     * @param epoch         The new epoch.
+     * @throws IllegalStateException if the partition already has an epoch 
assigned.
+     * package-private for testing.
+     */
+    void addTaskEpochs(
+        Map<String, Set<Integer>> activeTasks,
+        Map<String, Set<Integer>> standbyTasks,
+        Map<String, Set<Integer>> warmupTasks,
+        int epoch
+    ) {
+        addTaskEpochs(activeTasks, epoch, currentActiveTasksEpoch);
+        addTaskEpochs(standbyTasks, epoch, currentStandbyTasksEpoch);
+        addTaskEpochs(warmupTasks, epoch, currentWarmupTasksEpoch);
+    }
+
+    void addTaskEpochs(
+        Map<String, Set<Integer>> tasks,
+        int epoch,
+        TimelineHashMap<String, TimelineHashMap<Integer, Integer>> 
currentTasksEpoch
+    ) {
+        tasks.forEach((subtopologyId, assignedTaskPartitions) -> {
+            currentTasksEpoch.compute(subtopologyId, (__, partitionsOrNull) -> 
{
+                if (partitionsOrNull == null) {
+                    partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, 
assignedTaskPartitions.size());
+                }
+                for (Integer partitionId : assignedTaskPartitions) {
+                    Integer prevValue = partitionsOrNull.put(partitionId, 
epoch);
+                    if (prevValue != null) {
+                        throw new IllegalStateException(
+                            String.format("Cannot set the epoch of %s-%s to %d 
because the partition is " +
+                                "still owned at epoch %d", subtopologyId, 
partitionId, epoch, prevValue));
+                    }
+                }
+                return partitionsOrNull;
+            });
+        });
+    }
+
+    /**
+     * Decrements value by 1; returns null when reaching zero. This helper is
+     * meant to be used with Map#compute.
+     */
+    private static Integer decValue(String key, Integer value) {
+        if (value == null) return null;
+        return value == 1 ? null : value - 1;
+    }
+
+    /**
+     * Increments value by 1; This helper is meant to be used with Map#compute.
+     */
+    private static Integer incValue(String key, Integer value) {
+        return value == null ? 1 : value + 1;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
Streams group.
+     *
+     * @param records The list to which the new records are added.
+     */
+    public void createStreamsGroupRecords(
+        List<CoordinatorRecord> records
+    ) {
+        members().forEach((__, streamsGroupMember) ->
+            
records.add(CoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId(), 
streamsGroupMember))
+        );
+
+        
records.add(CoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((streamsGroupMemberId, streamsGroupMember) ->
+            records.add(CoordinatorRecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                streamsGroupMemberId,
+                targetAssignment(streamsGroupMemberId).activeTasks(),
+                targetAssignment(streamsGroupMemberId).standbyTasks(),
+                targetAssignment(streamsGroupMemberId).warmupTasks()
+            ))
+        );
+
+        
records.add(CoordinatorRecordHelpers.newStreamsTargetAssignmentEpochRecord(groupId(),
 groupEpoch()));
+
+        members().forEach((__, streamsGroupMember) ->
+            
records.add(CoordinatorRecordHelpers.newCurrentAssignmentRecord(groupId(), 
streamsGroupMember))
+        );
+    }
+
+    /**
+     * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Checks whether the member has any unreleased tasks.
+     *
+     * @param member The member to check.
+     * @return A boolean indicating whether the member has partitions in the 
target
+     *         assignment that hasn't been revoked by other members.
+     */
+    public boolean waitingOnUnreleasedActiveTasks(StreamsGroupMember member) {
+        if (member.state() == MemberState.UNRELEASED_TASKS) {
+            for (Map.Entry<String, Set<Integer>> entry : 
targetAssignment().get(member.memberId()).activeTasks().entrySet()) {
+                String subtopologyId = entry.getKey();
+                Set<Integer> assignedActiveTasks = 
member.assignedActiveTasks().getOrDefault(subtopologyId, 
Collections.emptySet());
+
+                for (int partition : entry.getValue()) {
+                    if (!assignedActiveTasks.contains(partition) && 
currentActiveTaskEpoch(subtopologyId, partition) != -1) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
new file mode 100644
index 00000000000..0379a58bb2c
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -0,0 +1,704 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * StreamsGroupMember contains all the information related to a member
+ * within a Streams group. This class is immutable and is fully backed
+ * by records stored in the __consumer_offsets topic.
+ */
+public class StreamsGroupMember {
+
+    /**
+     * A builder that facilitates the creation of a new member or the update of
+     * an existing one.
+     *
+     * Please refer to the javadoc of {{@link StreamsGroupMember}} for the
+     * definition of the fields.
+     */
+    public static class Builder {
+        private final String memberId;
+        private int memberEpoch = 0;
+        private int previousMemberEpoch = -1;
+        private MemberState state = MemberState.STABLE;
+        private String instanceId = null;
+        private String rackId = null;
+        private int rebalanceTimeoutMs = -1;
+        private String clientId = "";
+        private String clientHost = "";
+        private byte[] topologyHash;
+        private String assignor;
+        private String processId;
+        private StreamsGroupMemberMetadataValue.HostInfo hostInfo;
+        private List<StreamsGroupMemberMetadataValue.KeyValue> clientTags;
+        private byte[] userData;
+        private List<StreamsGroupMemberMetadataValue.KeyValue> 
assignmentConfigs;
+        private Map<String, Set<Integer>> assignedActiveTasks = 
Collections.emptyMap();
+        private Map<String, Set<Integer>> assignedStandbyTasks = 
Collections.emptyMap();
+        private Map<String, Set<Integer>> assignedWarmupTasks = 
Collections.emptyMap();
+        private Map<String, Set<Integer>> activeTasksPendingRevocation = 
Collections.emptyMap();
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId);
+        }
+
+        public Builder(StreamsGroupMember member) {
+            Objects.requireNonNull(member);
+
+            this.memberId = member.memberId;
+            this.memberEpoch = member.memberEpoch;
+            this.previousMemberEpoch = member.previousMemberEpoch;
+            this.instanceId = member.instanceId;
+            this.rackId = member.rackId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.topologyHash = member.topologyHash;
+            this.assignor = member.assignor;
+            this.processId = member.processId;
+            this.hostInfo = member.hostInfo;
+            this.clientTags = member.clientTags;
+            this.userData = member.userData;
+            this.assignmentConfigs = member.assignmentConfigs;
+            this.state = member.state;
+            this.assignedActiveTasks = member.assignedActiveTasks;
+            this.assignedStandbyTasks = member.assignedStandbyTasks;
+            this.assignedWarmupTasks = member.assignedWarmupTasks;
+            this.activeTasksPendingRevocation = 
member.activeTasksPendingRevocation;
+        }
+
+        public Builder updateMemberEpoch(int memberEpoch) {
+            int currentMemberEpoch = this.memberEpoch;
+            this.memberEpoch = memberEpoch;
+            this.previousMemberEpoch = currentMemberEpoch;
+            return this;
+        }
+
+        public Builder setMemberEpoch(int memberEpoch) {
+            this.memberEpoch = memberEpoch;
+            return this;
+        }
+
+        public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
+            this.previousMemberEpoch = previousMemberEpoch;
+            return this;
+        }
+
+        public Builder setInstanceId(String instanceId) {
+            this.instanceId = instanceId;
+            return this;
+        }
+
+        public Builder maybeUpdateInstanceId(Optional<String> instanceId) {
+            this.instanceId = instanceId.orElse(this.instanceId);
+            return this;
+        }
+
+        public Builder setRackId(String rackId) {
+            this.rackId = rackId;
+            return this;
+        }
+
+        public Builder maybeUpdateRackId(Optional<String> rackId) {
+            this.rackId = rackId.orElse(this.rackId);
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt 
rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = 
rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs);
+            return this;
+        }
+
+        public StreamsGroupMember.Builder maybeUpdateAssignor(Optional<String> 
assignor) {
+            this.assignor = assignor.orElse(this.assignor);
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setState(MemberState state) {
+            this.state = state;
+            return this;
+        }
+
+        public Builder setTopologyHash(byte[] topologyHash) {
+            this.topologyHash = topologyHash;
+            return this;
+        }
+
+        public Builder setAssignor(String assignor) {
+            this.assignor = assignor;
+            return this;
+        }
+
+        public Builder setProcessId(String processId) {
+            this.processId = processId;
+            return this;
+        }
+
+        public Builder setHostInfo(StreamsGroupMemberMetadataValue.HostInfo 
hostInfo) {
+            this.hostInfo = hostInfo;
+            return this;
+        }
+
+        public Builder 
setClientTags(List<StreamsGroupMemberMetadataValue.KeyValue> clientTags) {
+            this.clientTags = clientTags;
+            return this;
+        }
+
+        public Builder setUserData(byte[] userData) {
+            this.userData = userData;
+            return this;
+        }
+
+        public Builder 
setAssignmentConfigs(List<StreamsGroupMemberMetadataValue.KeyValue> 
assignmentConfigs) {
+            this.assignmentConfigs = assignmentConfigs;
+            return this;
+        }
+
+        public Builder setAssignedActiveTasks(Map<String, Set<Integer>> 
assignedActiveTasks) {
+            this.assignedActiveTasks = assignedActiveTasks;
+            return this;
+        }
+
+        public Builder setAssignedStandbyTasks(Map<String, Set<Integer>> 
assignedStandbyTasks) {
+            this.assignedStandbyTasks = assignedStandbyTasks;
+            return this;
+        }
+
+        public Builder setAssignedWarmupTasks(Map<String, Set<Integer>> 
assignedWarmupTasks) {
+            this.assignedWarmupTasks = assignedWarmupTasks;
+            return this;
+        }
+
+        public Builder setActiveTasksPendingRevocation(Map<String, 
Set<Integer>> activeTasksPendingRevocation) {
+            this.activeTasksPendingRevocation = activeTasksPendingRevocation;
+            return this;
+        }
+
+        public Builder updateWith(StreamsGroupMemberMetadataValue record) {
+            setInstanceId(record.instanceId());
+            setRackId(record.rackId());
+            setClientId(record.clientId());
+            setClientHost(record.clientHost());
+            setRebalanceTimeoutMs(record.rebalanceTimeoutMs());
+            setTopologyHash(record.topologyHash());
+            setAssignor(record.assignor());
+            setProcessId(record.processId());
+            setHostInfo(record.hostInfo());
+            setClientTags(record.clientTags());
+            setUserData(record.userData());
+            setAssignmentConfigs(record.assignmentConfigs());
+            return this;
+        }
+
+        public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue 
record) {
+            setMemberEpoch(record.memberEpoch());
+            setPreviousMemberEpoch(record.previousMemberEpoch());
+            setState(MemberState.fromValue(record.state()));
+            
setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks()));
+            
setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks()));
+            
setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks()));
+            
setActiveTasksPendingRevocation(assignmentFromTaskIds(record.activeTasksPendingRevocation()));
+            return this;
+        }
+
+        private Map<String, Set<Integer>> assignmentFromTaskIds(
+            List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
topicPartitionsList
+        ) {
+            return topicPartitionsList.stream().collect(Collectors.toMap(
+                StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopology,
+                taskIds -> Collections.unmodifiableSet(new 
HashSet<>(taskIds.partitions()))));
+        }
+
+        public StreamsGroupMember build() {
+            return new StreamsGroupMember(
+                memberId,
+                memberEpoch,
+                previousMemberEpoch,
+                instanceId,
+                rackId,
+                rebalanceTimeoutMs,
+                clientId,
+                clientHost,
+                topologyHash,
+                assignor,
+                processId,
+                hostInfo,
+                clientTags,
+                userData,
+                assignmentConfigs,
+                state,
+                assignedActiveTasks,
+                assignedStandbyTasks,
+                assignedWarmupTasks,
+                activeTasksPendingRevocation
+            );
+        }
+    }
+
+    /**
+     * The member id.
+     */
+    private final String memberId;
+
+    /**
+     * The current member epoch.
+     */
+    private final int memberEpoch;
+
+    /**
+     * The previous member epoch.
+     */
+    private final int previousMemberEpoch;
+
+    /**
+     * The member state.
+     */
+    private final MemberState state;
+
+    /**
+     * The instance id provided by the member.
+     */
+    private final String instanceId;
+
+    /**
+     * The rack id provided by the member.
+     */
+    private final String rackId;
+
+    /**
+     * The rebalance timeout provided by the member.
+     */
+    private final int rebalanceTimeoutMs;
+
+    /**
+     * The client id reported by the member.
+     */
+    private final String clientId;
+
+    /**
+     * The host reported by the member.
+     */
+    private final String clientHost;
+
+    /**
+     * The topology hash
+     */
+    private byte[] topologyHash;
+
+    /**
+     * The assignor
+     */
+    private String assignor;
+
+    /**
+     * The process ID
+     */
+    private String processId;
+
+    /**
+     * The host info
+     */
+    private StreamsGroupMemberMetadataValue.HostInfo hostInfo;
+
+    /**
+     * The client tags
+     */
+    private  List<StreamsGroupMemberMetadataValue.KeyValue> clientTags;
+
+    /**
+     * The user data
+     */
+    private byte[] userData;
+
+    /**
+     * The assignment configs
+     */
+    private  List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs;
+
+    /**
+     * Active tasks assigned to this member.
+     */
+    private final Map<String, Set<Integer>> assignedActiveTasks;
+
+    /**
+     * Standby tasks assigned to this member.
+     */
+    private final Map<String, Set<Integer>> assignedStandbyTasks;
+
+    /**
+     * Warmup tasks assigned to this member.
+     */
+    private final Map<String, Set<Integer>> assignedWarmupTasks;
+
+    /**
+     * Active tasks being revoked by this member.
+     */
+    private final Map<String, Set<Integer>> activeTasksPendingRevocation;
+
+    private StreamsGroupMember(
+        String memberId,
+        int memberEpoch,
+        int previousMemberEpoch,
+        String instanceId,
+        String rackId,
+        int rebalanceTimeoutMs,
+        String clientId,
+        String clientHost,
+        byte[] topologyHash,
+        String assignor,
+        String processId,
+        StreamsGroupMemberMetadataValue.HostInfo hostInfo,
+        List<StreamsGroupMemberMetadataValue.KeyValue> clientTags,
+        byte[] userData,
+        List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs,
+        MemberState state,
+        Map<String, Set<Integer>> assignedActiveTasks,
+        Map<String, Set<Integer>> assignedStandbyTasks,
+        Map<String, Set<Integer>> assignedWarmupTasks,
+        Map<String, Set<Integer>> activeTasksPendingRevocation
+    ) {
+        this.memberId = memberId;
+        this.memberEpoch = memberEpoch;
+        this.previousMemberEpoch = previousMemberEpoch;
+        this.state = state;
+        this.instanceId = instanceId;
+        this.rackId = rackId;
+        this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+        this.clientId = clientId;
+        this.clientHost = clientHost;
+        this.topologyHash = topologyHash;
+        this.assignor = assignor;
+        this.processId = processId;
+        this.hostInfo = hostInfo;
+        this.clientTags = clientTags;
+        this.userData = userData;
+        this.assignmentConfigs = assignmentConfigs;
+        this.assignedActiveTasks = assignedActiveTasks;
+        this.assignedStandbyTasks = assignedStandbyTasks;
+        this.assignedWarmupTasks = assignedWarmupTasks;
+        this.activeTasksPendingRevocation = activeTasksPendingRevocation;
+    }
+
+    /**
+     * @return The member id.
+     */
+    public String memberId() {
+        return memberId;
+    }
+
+    /**
+     * @return The current member epoch.
+     */
+    public int memberEpoch() {
+        return memberEpoch;
+    }
+
+    /**
+     * @return The previous member epoch.
+     */
+    public int previousMemberEpoch() {
+        return previousMemberEpoch;
+    }
+
+    /**
+     * @return The instance id.
+     */
+    public String instanceId() {
+        return instanceId;
+    }
+
+    /**
+     * @return The rack id.
+     */
+    public String rackId() {
+        return rackId;
+    }
+
+    /**
+     * @return The rebalance timeout in millis.
+     */
+    public int rebalanceTimeoutMs() {
+        return rebalanceTimeoutMs;
+    }
+
+    /**
+     * @return The client id.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * @return The client host.
+     */
+    public String clientHost() {
+        return clientHost;
+    }
+
+    /**
+     * @return The process ID
+     */
+    public byte[] topologyHash() {
+        return topologyHash;
+    }
+
+    /**
+     * @return The assignor
+     */
+    public Optional<String> assignor() {
+        return Optional.ofNullable(assignor);
+    }
+
+    /**
+     * @return The process ID
+     */
+    public String processId() {
+        return processId;
+    }
+
+    /**
+     * @return The host info
+     */
+    public StreamsGroupMemberMetadataValue.HostInfo hostInfo() {
+        return hostInfo;
+    }
+
+    /**
+     * @return The client tags
+     */
+    public List<StreamsGroupMemberMetadataValue.KeyValue> clientTags() {
+        return clientTags;
+    }
+
+    public byte[] userData() {
+        return userData;
+    }
+
+    /**
+     * @return The assignment configs
+     */
+    public List<StreamsGroupMemberMetadataValue.KeyValue> assignmentConfigs() {
+        return assignmentConfigs;
+    }
+
+    /**
+     * @return The current state.
+     */
+    public MemberState state() {
+        return state;
+    }
+
+    /**
+     * @return True if the member is in the Stable state and at the desired 
epoch.
+     */
+    public boolean isReconciledTo(int targetAssignmentEpoch) {
+        return state == MemberState.STABLE && memberEpoch == 
targetAssignmentEpoch;
+    }
+
+    /**
+     * @return The set of assigned active tasks.
+     */
+    public Map<String, Set<Integer>> assignedActiveTasks() {
+        return assignedActiveTasks;
+    }
+
+    /**
+     * @return The set of assigned standby tasks.
+     */
+    public Map<String, Set<Integer>> assignedStandbyTasks() {
+        return assignedStandbyTasks;
+    }
+
+    /**
+     * @return The set of assigned warm-up tasks.
+     */
+    public Map<String, Set<Integer>> assignedWarmupTasks() {
+        return assignedWarmupTasks;
+    }
+
+    /**
+     * @return The set of active tasks awaiting revocation from the member.
+     */
+    public Map<String, Set<Integer>> activeTasksPendingRevocation() {
+        return activeTasksPendingRevocation;
+    }
+
+    private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
+        Map<Uuid, Set<Integer>> partitions,
+        TopicsImage topicsImage
+    ) {
+        List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitions = new ArrayList<>();
+        partitions.forEach((topicId, partitionSet) -> {
+            String topicName = lookupTopicNameById(topicId, topicsImage);
+            if (topicName != null) {
+                topicPartitions.add(new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                    .setTopicId(topicId)
+                    .setTopicName(topicName)
+                    .setPartitions(new ArrayList<>(partitionSet)));
+            }
+        });
+        return topicPartitions;
+    }
+
+    private static String lookupTopicNameById(
+        Uuid topicId,
+        TopicsImage topicsImage
+    ) {
+        TopicImage topicImage = topicsImage.getTopic(topicId);
+        if (topicImage != null) {
+            return topicImage.name();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        StreamsGroupMember that = (StreamsGroupMember) o;
+        return memberEpoch == that.memberEpoch
+            && previousMemberEpoch == that.previousMemberEpoch
+            && rebalanceTimeoutMs == that.rebalanceTimeoutMs
+            && Objects.equals(memberId, that.memberId)
+            && state == that.state
+            && Objects.equals(instanceId, that.instanceId)
+            && Objects.equals(rackId, that.rackId)
+            && Objects.equals(clientId, that.clientId)
+            && Objects.equals(clientHost, that.clientHost)
+            && Objects.deepEquals(topologyHash, that.topologyHash)
+            && Objects.equals(assignor, that.assignor)
+            && Objects.equals(processId, that.processId)
+            && Objects.equals(hostInfo, that.hostInfo)
+            && Objects.equals(clientTags, that.clientTags)
+            && Objects.deepEquals(userData, that.userData)
+            && Objects.equals(assignmentConfigs, that.assignmentConfigs)
+            && Objects.equals(assignedActiveTasks, that.assignedActiveTasks)
+            && Objects.equals(assignedStandbyTasks, that.assignedStandbyTasks)
+            && Objects.equals(assignedWarmupTasks, that.assignedWarmupTasks)
+            && Objects.equals(activeTasksPendingRevocation, 
that.activeTasksPendingRevocation);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = memberId != null ? memberId.hashCode() : 0;
+        result = 31 * result + memberEpoch;
+        result = 31 * result + previousMemberEpoch;
+        result = 31 * result + Objects.hashCode(state);
+        result = 31 * result + Objects.hashCode(instanceId);
+        result = 31 * result + Objects.hashCode(rackId);
+        result = 31 * result + rebalanceTimeoutMs;
+        result = 31 * result + Objects.hashCode(clientId);
+        result = 31 * result + Objects.hashCode(clientHost);
+        result = 31 * result + Arrays.hashCode(topologyHash);
+        result = 31 * result + Objects.hashCode(assignor);
+        result = 31 * result + Objects.hashCode(processId);
+        result = 31 * result + Objects.hashCode(hostInfo);
+        result = 31 * result + Objects.hashCode(clientTags);
+        result = 31 * result + Arrays.hashCode(userData);
+        result = 31 * result + Objects.hashCode(assignmentConfigs);
+        result = 31 * result + Objects.hashCode(assignedActiveTasks);
+        result = 31 * result + Objects.hashCode(assignedStandbyTasks);
+        result = 31 * result + Objects.hashCode(assignedWarmupTasks);
+        result = 31 * result + Objects.hashCode(activeTasksPendingRevocation);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "StreamsGroupMember(" +
+            "memberId='" + memberId + '\'' +
+            ", memberEpoch=" + memberEpoch +
+            ", previousMemberEpoch=" + previousMemberEpoch +
+            ", state='" + state + '\'' +
+            ", instanceId='" + instanceId + '\'' +
+            ", rackId='" + rackId + '\'' +
+            ", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
+            ", clientId='" + clientId + '\'' +
+            ", clientHost='" + clientHost + '\'' +
+            ", assignedActiveTasks=" + assignedActiveTasks +
+            ", assignedStandbyTasks=" + assignedStandbyTasks +
+            ", assignedWarmupTasks=" + assignedWarmupTasks +
+            ", activeTasksPendingRevocation=" + activeTasksPendingRevocation +
+            ')';
+    }
+
+    /**
+     * @return True if the two provided members have different assigned active 
tasks.
+     */
+    public static boolean hasAssignedActiveTasksChanged(
+        StreamsGroupMember member1,
+        StreamsGroupMember member2
+    ) {
+        return 
!member1.assignedActiveTasks().equals(member2.assignedActiveTasks());
+    }
+
+    /**
+     * @return True if the two provided members have different assigned active 
tasks.
+     */
+    public static boolean hasAssignedStandbyTasksChanged(
+        StreamsGroupMember member1,
+        StreamsGroupMember member2
+    ) {
+        return 
!member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks());
+    }
+
+    /**
+     * @return True if the two provided members have different assigned active 
tasks.
+     */
+    public static boolean hasAssignedWarmupTasksChanged(
+        StreamsGroupMember member1,
+        StreamsGroupMember member2
+    ) {
+        return 
!member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks());
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/SubscribedTopicMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/SubscribedTopicMetadata.java
new file mode 100644
index 00000000000..fb9b1133637
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/SubscribedTopicMetadata.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * The subscribed topic metadata class is used by the {@link 
org.apache.kafka.coordinator.group.assignor.PartitionAssignor} to obtain
+ * topic and partition metadata for the topics that the consumer group is 
subscribed to.
+ */
+public class SubscribedTopicMetadata implements SubscribedTopicDescriber {
+    /**
+     * The topic Ids mapped to their corresponding {@link 
org.apache.kafka.coordinator.group.consumer.TopicMetadata}
+     * object, which contains topic and partition metadata.
+     */
+    private final Map<Uuid, 
org.apache.kafka.coordinator.group.consumer.TopicMetadata> topicMetadata;
+
+    public SubscribedTopicMetadata(Map<Uuid, 
org.apache.kafka.coordinator.group.consumer.TopicMetadata> topicMetadata) {
+        this.topicMetadata = Objects.requireNonNull(topicMetadata);
+    }
+
+    /**
+     * Map of topic Ids to topic metadata.
+     *
+     * @return The map of topic Ids to topic metadata.
+     */
+    public Map<Uuid, 
org.apache.kafka.coordinator.group.consumer.TopicMetadata> topicMetadata() {
+        return this.topicMetadata;
+    }
+
+    /**
+     * The number of partitions for the given topic Id.
+     *
+     * @param topicId   Uuid corresponding to the topic.
+     * @return The number of partitions corresponding to the given topic Id,
+     *         or -1 if the topic Id does not exist.
+     */
+    @Override
+    public int numPartitions(Uuid topicId) {
+        org.apache.kafka.coordinator.group.consumer.TopicMetadata topic = 
this.topicMetadata.get(topicId);
+        return topic == null ? -1 : topic.numPartitions();
+    }
+
+    /**
+     * Returns all the available racks associated with the replicas of the 
given partition.
+     *
+     * @param topicId       Uuid corresponding to the partition's topic.
+     * @param partition     Partition Id within the topic.
+     * @return The set of racks corresponding to the replicas of the topics 
partition.
+     *         If the topic Id does not exist or no partition rack information 
is available, an empty set is returned.
+     */
+    @Override
+    public Set<String> racksForPartition(Uuid topicId, int partition) {
+        TopicMetadata topic = this.topicMetadata.get(topicId);
+        return topic == null ? Collections.emptySet() : 
topic.partitionRacks().getOrDefault(partition, Collections.emptySet());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SubscribedTopicMetadata that = (SubscribedTopicMetadata) o;
+        return topicMetadata.equals(that.topicMetadata);
+    }
+
+    @Override
+    public int hashCode() {
+        return topicMetadata.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "SubscribedTopicMetadata(" +
+            "topicMetadata=" + topicMetadata +
+            ')';
+    }
+}
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
index 0cbe1ba1e4e..aaaccf7d865 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json
@@ -32,7 +32,7 @@
       "about": "Currently assigned standby tasks for this streams client." },
     { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskIds",
       "about": "Currently assigned warming up tasks for this streams client." 
},
-    { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": 
"[]TaskId",
+    { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": 
"[]TaskIds",
       "about": "The active tasks that must be revoked by this member." }
   ],
   "commonStructs": [
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
new file mode 100644
index 00000000000..5041aa9cf07
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkStreamsAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTaskAssignment;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class StreamsGroupMemberTest {
+
+    @Test
+    public void testNewMember() {
+        String subtopologyId1 = "subtopology-1";
+        String subtopologyId2 = "subtopology-2";
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setTopologyHash("topology-hash".getBytes())
+            .setAssignor("assignor")
+            .setProcessId("process-id")
+            .setHostInfo(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090))
+            .setClientTags(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")))
+            .setUserData("user-data".getBytes())
+            .setAssignmentConfigs(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")))
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 
2, 3)))
+            
.setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 
6, 5, 4)))
+            
.setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 
8, 9)))
+            
.setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2,
 3, 2, 1)))
+            .build();
+
+        assertEquals("member-id", member.memberId());
+        assertEquals(10, member.memberEpoch());
+        assertEquals(9, member.previousMemberEpoch());
+        assertEquals("instance-id", member.instanceId());
+        assertEquals("rack-id", member.rackId());
+        assertEquals("client-id", member.clientId());
+        assertEquals("hostname", member.clientHost());
+        assertTrue(Arrays.equals("topology-hash".getBytes(), 
member.topologyHash()));
+        assertEquals("assignor", member.assignor().get());
+        assertEquals("process-id", member.processId());
+        assertEquals(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090), 
member.hostInfo());
+        assertEquals(
+            Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")),
+            member.clientTags()
+        );
+        assertTrue(Arrays.equals("user-data".getBytes(), member.userData()));
+        assertEquals(
+            Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")),
+            member.assignmentConfigs()
+        );
+        assertEquals(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 2, 3)),
+            member.assignedActiveTasks()
+        );
+        assertEquals(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 5, 4)),
+            member.assignedStandbyTasks()
+        );
+        assertEquals(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 8, 9)),
+            member.assignedWarmupTasks()
+        );
+        assertEquals(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 3, 2, 1)),
+            member.activeTasksPendingRevocation()
+        );
+    }
+
+    @Test
+    public void testEquals() {
+        String subtopologyId1 = "subtopology-1";
+        String subtopologyId2 = "subtopology-2";
+
+        StreamsGroupMember member1 = new 
StreamsGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setTopologyHash("topology-hash".getBytes())
+            .setAssignor("assignor")
+            .setProcessId("process-id")
+            .setHostInfo(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090))
+            .setClientTags(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")))
+            .setUserData("user-data".getBytes())
+            .setAssignmentConfigs(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")))
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 
2, 3)))
+            
.setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 
6, 5, 4)))
+            
.setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 
8, 9)))
+            
.setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2,
 3, 2, 1)))
+            .build();
+
+        StreamsGroupMember member2 = new 
StreamsGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setTopologyHash("topology-hash".getBytes())
+            .setAssignor("assignor")
+            .setProcessId("process-id")
+            .setHostInfo(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090))
+            .setClientTags(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")))
+            .setUserData("user-data".getBytes())
+            .setAssignmentConfigs(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")))
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 
2, 3)))
+            
.setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 
6, 5, 4)))
+            
.setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 
8, 9)))
+            
.setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2,
 3, 2, 1)))
+            .build();
+
+        StreamsGroupMember member3 = new 
StreamsGroupMember.Builder("member3-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setTopologyHash("topology-hash".getBytes())
+            .setAssignor("assignor")
+            .setProcessId("process-id")
+            .setHostInfo(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090))
+            .setClientTags(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")))
+            .setUserData("user-data".getBytes())
+            .setAssignmentConfigs(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")))
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 
2, 3)))
+            
.setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 
6, 5, 4)))
+            
.setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 
8, 9)))
+            
.setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2,
 3, 2, 1)))
+            .build();
+
+        assertEquals(member1, member2);
+        assertNotEquals(member1, member3);
+    }
+
+    @Test
+    public void testUpdateMember() {
+        String subtopologyId1 = "subtopology-1";
+        String subtopologyId2 = "subtopology-2";
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client-id")
+            .setClientHost("hostname")
+            .setTopologyHash("topology-hash".getBytes())
+            .setAssignor("assignor")
+            .setProcessId("process-id")
+            .setHostInfo(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090))
+            .setClientTags(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")))
+            .setUserData("user-data".getBytes())
+            .setAssignmentConfigs(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")))
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 
2, 3)))
+            
.setAssignedStandbyTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 
6, 5, 4)))
+            
.setAssignedWarmupTasks(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 
8, 9)))
+            
.setActiveTasksPendingRevocation(mkStreamsAssignment(mkTaskAssignment(subtopologyId2,
 3, 2, 1)))
+            .build();
+
+        // This is a no-op.
+        StreamsGroupMember updatedMember = new 
StreamsGroupMember.Builder(member)
+            .maybeUpdateRackId(Optional.empty())
+            .maybeUpdateInstanceId(Optional.empty())
+            .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty())
+            .maybeUpdateAssignor(Optional.empty())
+            .build();
+
+        assertEquals(member, updatedMember);
+
+        updatedMember = new StreamsGroupMember.Builder(member)
+            .maybeUpdateRackId(Optional.of("new-rack-id"))
+            .maybeUpdateInstanceId(Optional.of("new-instance-id"))
+            .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000))
+            .maybeUpdateAssignor(Optional.of("new-assignor"))
+            .build();
+
+        assertEquals("new-instance-id", updatedMember.instanceId());
+        assertEquals("new-rack-id", updatedMember.rackId());
+        assertEquals(6000, updatedMember.rebalanceTimeoutMs());
+        assertEquals("new-assignor", updatedMember.assignor());
+    }
+
+    @Test
+    public void testUpdateWithStreamsGroupMemberMetadataValue() {
+        StreamsGroupMemberMetadataValue record = new 
StreamsGroupMemberMetadataValue()
+            .setClientId("client-id")
+            .setClientHost("host-id")
+            .setInstanceId("instance-id")
+            .setRackId("rack-id")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyHash("topology-hash".getBytes())
+            .setAssignor("assignor")
+            .setProcessId("process-id")
+            .setHostInfo(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090))
+            .setClientTags(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")))
+            .setUserData("user-data".getBytes())
+            .setAssignmentConfigs(Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")));
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+            .updateWith(record)
+            .build();
+
+        assertEquals("instance-id", member.instanceId());
+        assertEquals("rack-id", member.rackId());
+        assertEquals("client-id", member.clientId());
+        assertEquals("host-id", member.clientHost());
+        assertEquals(1000, member.rebalanceTimeoutMs());
+        assertTrue(Arrays.equals("topology-hash".getBytes(), 
member.topologyHash()));
+        assertEquals("assignor", member.assignor().get());
+        assertEquals("process-id", member.processId());
+        assertEquals(new 
StreamsGroupMemberMetadataValue.HostInfo().setHost("host").setPort(9090), 
member.hostInfo());
+        assertEquals(
+            Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("client").setValue("tag")),
+            member.clientTags()
+        );
+        assertTrue(Arrays.equals("user-data".getBytes(), member.userData()));
+        assertEquals(
+            Arrays.asList(new 
StreamsGroupMemberMetadataValue.KeyValue().setKey("assignment").setValue("config")),
+            member.assignmentConfigs()
+        );
+    }
+
+    @Test
+    public void testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
+        String subtopologyId1 = "subtopology-1";
+        String subtopologyId2 = "subtopology-2";
+
+        StreamsGroupCurrentMemberAssignmentValue record = new 
StreamsGroupCurrentMemberAssignmentValue()
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setState((byte) 1)
+            .setActiveTasks(Arrays.asList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subtopologyId1)
+                .setPartitions(Arrays.asList(1, 2, 3)))
+            )
+            .setStandbyTasks(Arrays.asList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subtopologyId2)
+                .setPartitions(Arrays.asList(6, 5, 4)))
+            )
+            .setWarmupTasks(Arrays.asList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subtopologyId1)
+                .setPartitions(Arrays.asList(7, 8, 9)))
+            )
+            .setActiveTasksPendingRevocation(Arrays.asList(new 
StreamsGroupCurrentMemberAssignmentValue.TaskIds()
+                .setSubtopology(subtopologyId2)
+                .setPartitions(Arrays.asList(2, 3, 1)))
+            );
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+            .updateWith(record)
+            .build();
+
+        assertEquals(10, member.memberEpoch());
+        assertEquals(9, member.previousMemberEpoch());
+        assertEquals(MemberState.STABLE, member.state());
+        assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 1, 
2, 3)), member.assignedActiveTasks());
+        assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 6, 
5, 4)), member.assignedStandbyTasks());
+        assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId1, 7, 
8, 9)), member.assignedWarmupTasks());
+        assertEquals(mkStreamsAssignment(mkTaskAssignment(subtopologyId2, 2, 
3, 1)), member.activeTasksPendingRevocation());
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
new file mode 100644
index 00000000000..d2f2f4fd22d
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -0,0 +1,730 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.coordinator.group.OffsetAndMetadata;
+import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
+import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkStreamsAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTaskAssignment;
+import static org.apache.kafka.server.immutable.ImmutableSet.singleton;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class StreamsGroupTest {
+
+    private StreamsGroup createStreamsGroup(String groupId) {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        return new StreamsGroup(
+            snapshotRegistry,
+            groupId,
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+    }
+
+    @Test
+    public void testGetOrCreateMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        // Create a member.
+        member = streamsGroup.getOrMaybeCreateMember("member-id", true);
+        assertEquals("member-id", member.memberId());
+
+        // Add member to the group.
+        streamsGroup.updateMember(member);
+
+        // Get that member back.
+        member = streamsGroup.getOrMaybeCreateMember("member-id", false);
+        assertEquals("member-id", member.memberId());
+
+        assertThrows(UnknownMemberIdException.class, () ->
+            streamsGroup.getOrMaybeCreateMember("does-not-exist", false));
+    }
+
+    @Test
+    public void testUpdateMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+        member = new StreamsGroupMember.Builder(member)
+            .setAssignor("client")
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", 
false));
+    }
+
+    @Test
+    public void testNoStaticMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        // Create a new member which is not static
+        streamsGroup.getOrMaybeCreateMember("member", true);
+        assertNull(streamsGroup.staticMember("instance-id"));
+    }
+
+    @Test
+    public void testGetStaticMemberByInstanceId() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = streamsGroup.getOrMaybeCreateMember("member", true);
+
+        member = new StreamsGroupMember.Builder(member)
+            .setInstanceId("instance")
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(member, streamsGroup.staticMember("instance"));
+        assertEquals(member, streamsGroup.getOrMaybeCreateMember("member", 
false));
+        assertEquals(member.memberId(), 
streamsGroup.staticMemberId("instance"));
+    }
+
+    @Test
+    public void testRemoveMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        StreamsGroupMember member = 
streamsGroup.getOrMaybeCreateMember("member", true);
+        streamsGroup.updateMember(member);
+        assertTrue(streamsGroup.hasMember("member"));
+
+        streamsGroup.removeMember("member");
+        assertFalse(streamsGroup.hasMember("member"));
+
+    }
+
+    @Test
+    public void testRemoveStaticMember() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setInstanceId("instance")
+            .build();
+
+        streamsGroup.updateMember(member);
+        assertTrue(streamsGroup.hasMember("member"));
+
+        streamsGroup.removeMember("member");
+        assertFalse(streamsGroup.hasMember("member"));
+        assertNull(streamsGroup.staticMember("instance"));
+        assertNull(streamsGroup.staticMemberId("instance"));
+    }
+
+    @Test
+    public void testUpdatingMemberUpdatesPartitionEpoch() {
+        String fooSubtopology = "foo-sub";
+        String barSubtopology = "bar-sub";
+        String zarSubtopology = "zar-sub";
+
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = new StreamsGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setAssignedActiveTasks(mkStreamsAssignment(
+                mkTaskAssignment(fooSubtopology, 1, 2, 3)))
+            .setActiveTasksPendingRevocation(mkStreamsAssignment(
+                mkTaskAssignment(barSubtopology, 4, 5, 6)))
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
1));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
2));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
3));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
4));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
5));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
6));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
7));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
8));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
9));
+
+        member = new StreamsGroupMember.Builder(member)
+            .setMemberEpoch(11)
+            .setAssignedActiveTasks(mkStreamsAssignment(
+                mkTaskAssignment(barSubtopology, 1, 2, 3)))
+            .setActiveTasksPendingRevocation(mkStreamsAssignment(
+                mkTaskAssignment(zarSubtopology, 4, 5, 6)))
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(11, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
1));
+        assertEquals(11, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
2));
+        assertEquals(11, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
3));
+        assertEquals(11, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
4));
+        assertEquals(11, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
5));
+        assertEquals(11, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
6));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
7));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
8));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
9));
+    }
+
+    @Test
+    public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsReassignedBeforeBeingRevoked()
 {
+        String fooSubtopologyId = "foo-sub";
+
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = new StreamsGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setAssignedActiveTasks(emptyMap())
+            .setAssignedStandbyTasks(emptyMap())
+            .setAssignedWarmupTasks(emptyMap())
+            .setActiveTasksPendingRevocation(mkStreamsAssignment(
+                mkTaskAssignment(fooSubtopologyId, 1)))
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopologyId, 
1));
+
+        member = new StreamsGroupMember.Builder(member)
+            .setMemberEpoch(11)
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 
1)))
+            .setAssignedStandbyTasks(emptyMap())
+            .setAssignedWarmupTasks(emptyMap())
+            .setActiveTasksPendingRevocation(emptyMap())
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(11, streamsGroup.currentActiveTaskEpoch(fooSubtopologyId, 
1));
+    }
+
+    @Test
+    public void 
testUpdatingMemberUpdatesPartitionEpochWhenPartitionIsNotReleased() {
+        String fooSubtopologyId = "foo-sub";
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+            .setMemberEpoch(10)
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 
1)))
+            .setAssignedStandbyTasks(emptyMap())
+            .setAssignedWarmupTasks(emptyMap())
+            .build();
+
+        streamsGroup.updateMember(m1);
+
+        StreamsGroupMember m2 = new StreamsGroupMember.Builder("m2")
+            .setMemberEpoch(10)
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 
1)))
+            .setAssignedStandbyTasks(emptyMap())
+            .setAssignedWarmupTasks(emptyMap())
+            .build();
+
+        // m2 should not be able to acquire foo-1 because the partition is
+        // still owned by another member.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.updateMember(m2));
+    }
+
+    @Test
+    public void testRemoveTaskEpochs() {
+        String fooSubtopologyId = "foo-sub";
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        // Removing should fail because there is no epoch set.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeActiveTaskEpochs(
+            mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)),
+            10
+        ));
+
+        StreamsGroupMember m1 = new StreamsGroupMember.Builder("m1")
+            .setMemberEpoch(10)
+            
.setAssignedActiveTasks(mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 
1)))
+            .build();
+
+        streamsGroup.updateMember(m1);
+
+        // Removing should fail because the expected epoch is incorrect.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.removeActiveTaskEpochs(
+            mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)),
+            11
+        ));
+    }
+
+    @Test
+    public void testAddPartitionEpochs() {
+        String fooSubtopologyId = "foo-sub";
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        streamsGroup.addTaskEpochs(
+            mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)),
+            emptyMap(),
+            emptyMap(),
+            10
+        );
+
+        // Changing the epoch should fail because the owner of the partition
+        // should remove it first.
+        assertThrows(IllegalStateException.class, () -> 
streamsGroup.addTaskEpochs(
+                mkStreamsAssignment(mkTaskAssignment(fooSubtopologyId, 1)),
+                emptyMap(),
+                emptyMap(),
+                11
+        ));
+    }
+
+    @Test
+    public void testDeletingMemberRemovesPartitionEpoch() {
+        String fooSubtopology = "foo-sub";
+        String barSubtopology = "bar-sub";
+        String zarSubtopology = "zar-sub";
+
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        StreamsGroupMember member;
+
+        member = new StreamsGroupMember.Builder("member")
+            .setMemberEpoch(10)
+            .setAssignedActiveTasks(mkStreamsAssignment(
+                mkTaskAssignment(fooSubtopology, 1, 2, 3)))
+            .setActiveTasksPendingRevocation(mkStreamsAssignment(
+                mkTaskAssignment(barSubtopology, 4, 5, 6)))
+            .build();
+
+        streamsGroup.updateMember(member);
+
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
1));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
2));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
3));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
4));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
5));
+        assertEquals(10, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
6));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
7));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
8));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
9));
+
+        streamsGroup.removeMember(member.memberId());
+
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
1));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
2));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(barSubtopology, 
3));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
4));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
5));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(zarSubtopology, 
6));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
7));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
8));
+        assertEquals(-1, streamsGroup.currentActiveTaskEpoch(fooSubtopology, 
9));
+    }
+
+    @Test
+    public void testWaitingOnUnreleasedTask() {
+        String fooSubtopology = "foo-sub";
+        String barSubtopology = "bar-sub";
+        String zarSubtopology = "zar-sub";
+        String memberId1 = "m1";
+        String memberId2 = "m2";
+
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        streamsGroup.updateTargetAssignment(memberId1,
+            new Assignment(
+                mkStreamsAssignment(
+                    mkTaskAssignment(fooSubtopology, 1, 2, 3),
+                    mkTaskAssignment(zarSubtopology, 7, 8, 9)
+                ),
+                emptyMap(),
+                emptyMap())
+        );
+
+        StreamsGroupMember member1 = new StreamsGroupMember.Builder(memberId1)
+            .setMemberEpoch(10)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setAssignedActiveTasks(mkStreamsAssignment(
+                mkTaskAssignment(fooSubtopology, 1, 2, 3)
+            ))
+            .setActiveTasksPendingRevocation(mkStreamsAssignment(
+                mkTaskAssignment(barSubtopology, 4, 5, 6)
+            ))
+            .build();
+        streamsGroup.updateMember(member1);
+
+        assertFalse(streamsGroup.waitingOnUnreleasedActiveTasks(member1));
+
+        StreamsGroupMember member2 = new StreamsGroupMember.Builder(memberId2)
+            .setMemberEpoch(10)
+            .setActiveTasksPendingRevocation(mkStreamsAssignment(
+                mkTaskAssignment(zarSubtopology, 7)))
+            .build();
+        streamsGroup.updateMember(member2);
+
+        assertTrue(streamsGroup.waitingOnUnreleasedActiveTasks(member1));
+    }
+
+    @Test
+    public void testGroupState() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+
+        StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .build();
+
+        streamsGroup.updateMember(member1);
+        streamsGroup.setGroupEpoch(1);
+
+        assertEquals(MemberState.STABLE, member1.state());
+        assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
+
+        StreamsGroupMember member2 = new StreamsGroupMember.Builder("member2")
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .build();
+
+        streamsGroup.updateMember(member2);
+        streamsGroup.setGroupEpoch(2);
+
+        assertEquals(MemberState.STABLE, member2.state());
+        assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
+
+        streamsGroup.setTargetAssignmentEpoch(2);
+
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+
+        member1 = new StreamsGroupMember.Builder(member1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .build();
+
+        streamsGroup.updateMember(member1);
+
+        assertEquals(MemberState.STABLE, member1.state());
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+
+        // Member 2 is not stable so the group stays in reconciling state.
+        member2 = new StreamsGroupMember.Builder(member2)
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .build();
+
+        streamsGroup.updateMember(member2);
+
+        assertEquals(MemberState.UNREVOKED_TASKS, member2.state());
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+
+        member2 = new StreamsGroupMember.Builder(member2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(1)
+            .build();
+
+        streamsGroup.updateMember(member2);
+
+        assertEquals(MemberState.STABLE, member2.state());
+        assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
+
+        streamsGroup.removeMember("member1");
+        streamsGroup.removeMember("member2");
+
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+    }
+
+    @Test
+    public void testUpdateInvertedAssignment() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(snapshotRegistry, 
"test-group", metricsShard);
+        String subtopologyId = "foo-sub";
+        String memberId1 = "member1";
+        String memberId2 = "member2";
+
+        // Initial assignment for member1
+        Assignment initialAssignment = new Assignment(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId, 0)),
+            emptyMap(),
+            emptyMap()
+        );
+        streamsGroup.updateTargetAssignment(memberId1, initialAssignment);
+
+        // Verify that partition 0 is assigned to member1.
+        assertEquals(
+            mkMap(
+                mkEntry(subtopologyId, mkMap(mkEntry(0, memberId1)))
+            ),
+            streamsGroup.invertedTargetActiveTasksAssignment()
+        );
+
+        // New assignment for member1
+        Assignment newAssignment = new Assignment(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId, 1)),
+            emptyMap(),
+            emptyMap()
+        );
+        streamsGroup.updateTargetAssignment(memberId1, newAssignment);
+
+        // Verify that partition 0 is no longer assigned and partition 1 is 
assigned to member1
+        assertEquals(
+            mkMap(
+                mkEntry(subtopologyId, mkMap(mkEntry(1, memberId1)))
+            ),
+            streamsGroup.invertedTargetActiveTasksAssignment()
+        );
+
+        // New assignment for member2 to add partition 1
+        Assignment newAssignment2 = new Assignment(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId, 1)),
+            emptyMap(),
+            emptyMap()
+        );
+        streamsGroup.updateTargetAssignment(memberId2, newAssignment);
+
+        // Verify that partition 1 is assigned to member2
+        assertEquals(
+            mkMap(
+                mkEntry(subtopologyId, mkMap(mkEntry(1, memberId2)))
+            ),
+            streamsGroup.invertedTargetActiveTasksAssignment()
+        );
+
+        // New assignment for member1 to revoke partition 1 and assign 
partition 0
+        Assignment newAssignment1 = new Assignment(
+            mkStreamsAssignment(mkTaskAssignment(subtopologyId, 0)),
+            emptyMap(),
+            emptyMap()
+        );
+        streamsGroup.updateTargetAssignment(memberId1, newAssignment1);
+
+        // Verify that partition 1 is still assigned to member2 and partition 
0 is assigned to member1
+        assertEquals(
+            mkMap(
+                mkEntry(subtopologyId, mkMap(
+                    mkEntry(0, memberId1),
+                    mkEntry(1, memberId2)
+                ))
+            ),
+            streamsGroup.invertedTargetActiveTasksAssignment()
+        );
+
+        // Test remove target assignment for member1
+        streamsGroup.removeTargetAssignment(memberId1);
+
+        // Verify that partition 0 is no longer assigned and partition 1 is 
still assigned to member2
+        assertEquals(
+            mkMap(
+                mkEntry(subtopologyId, mkMap(mkEntry(1, memberId2)))
+            ),
+            streamsGroup.invertedTargetActiveTasksAssignment()
+        );
+    }
+
+    @Test
+    public void testMetadataRefreshDeadline() {
+        MockTime time = new MockTime();
+        StreamsGroup group = createStreamsGroup("group-foo");
+
+        // Group epoch starts at 0.
+        assertEquals(0, group.groupEpoch());
+
+        // The refresh time deadline should be empty when the group is created 
or loaded.
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(0, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline. The metadata remains valid because the 
deadline
+        // has not past and the group epoch is correct.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());
+        assertFalse(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(group.groupEpoch(), 
group.metadataRefreshDeadline().epoch);
+
+        // Advance past the deadline. The metadata should have expired.
+        time.sleep(1001L);
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+
+        // Set the refresh time deadline with a higher group epoch. The 
metadata is considered
+        // as expired because the group epoch attached to the deadline is 
higher than the
+        // current group epoch.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch() + 1);
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(group.groupEpoch() + 1, 
group.metadataRefreshDeadline().epoch);
+
+        // Advance the group epoch.
+        group.setGroupEpoch(group.groupEpoch() + 1);
+
+        // Set the refresh deadline. The metadata remains valid because the 
deadline
+        // has not past and the group epoch is correct.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, 
group.groupEpoch());
+        assertFalse(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(time.milliseconds() + 1000, 
group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(group.groupEpoch(), 
group.metadataRefreshDeadline().epoch);
+
+        // Request metadata refresh. The metadata expires immediately.
+        group.requestMetadataRefresh();
+        assertTrue(group.hasMetadataExpired(time.milliseconds()));
+        assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+        assertEquals(0, group.metadataRefreshDeadline().epoch);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testValidateOffsetCommit(boolean isTransactional) {
+        StreamsGroup group = createStreamsGroup("group-foo");
+
+        // Simulate a call from the admin client without member id and member 
epoch.
+        // This should pass only if the group is empty.
+        group.validateOffsetCommit("", "", -1, isTransactional);
+
+        // The member does not exist.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetCommit("member-id", null, 0, isTransactional));
+
+        // Create a member.
+        group.updateMember(new 
StreamsGroupMember.Builder("member-id").build());
+
+        // A call from the admin client should fail as the group is not empty.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetCommit("", "", -1, isTransactional));
+
+        // The member epoch is stale.
+        assertThrows(StaleMemberEpochException.class, () ->
+            group.validateOffsetCommit("member-id", "", 10, isTransactional));
+
+        // This should succeed.
+        group.validateOffsetCommit("member-id", "", 0, isTransactional);
+    }
+
+    @Test
+    public void testValidateOffsetFetch() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        StreamsGroup group = new StreamsGroup(
+            snapshotRegistry,
+            "group-foo",
+            mock(GroupCoordinatorMetricsShard.class)
+        );
+
+        // Simulate a call from the admin client without member id and member 
epoch.
+        group.validateOffsetFetch(null, -1, Long.MAX_VALUE);
+
+        // The member does not exist.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
+
+        // Create a member.
+        snapshotRegistry.getOrCreateSnapshot(0);
+        group.updateMember(new 
StreamsGroupMember.Builder("member-id").build());
+
+        // The member does not exist at last committed offset 0.
+        assertThrows(UnknownMemberIdException.class, () ->
+            group.validateOffsetFetch("member-id", 0, 0));
+
+        // The member exists but the epoch is stale when the last committed 
offset is not considered.
+        assertThrows(StaleMemberEpochException.class, () ->
+            group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE));
+
+        // This should succeed.
+        group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
+    }
+
+    @Test
+    public void testValidateDeleteGroup() {
+        StreamsGroup streamsGroup = createStreamsGroup("foo");
+
+        assertEquals(StreamsGroup.StreamsGroupState.EMPTY, 
streamsGroup.state());
+        assertDoesNotThrow(streamsGroup::validateDeleteGroup);
+
+        StreamsGroupMember member1 = new StreamsGroupMember.Builder("member1")
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .build();
+        streamsGroup.updateMember(member1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.RECONCILING, 
streamsGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+
+        streamsGroup.setGroupEpoch(1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.ASSIGNING, 
streamsGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+
+        streamsGroup.setTargetAssignmentEpoch(1);
+
+        assertEquals(StreamsGroup.StreamsGroupState.STABLE, 
streamsGroup.state());
+        assertThrows(GroupNotEmptyException.class, 
streamsGroup::validateDeleteGroup);
+    }
+
+    @Test
+    public void testOffsetExpirationCondition() {
+        long currentTimestamp = 30000L;
+        long commitTimestamp = 20000L;
+        long offsetsRetentionMs = 10000L;
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty());
+        StreamsGroup group = new StreamsGroup(new SnapshotRegistry(new 
LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class));
+
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+        assertTrue(offsetExpirationCondition.isPresent());
+
+        OffsetExpirationConditionImpl condition = 
(OffsetExpirationConditionImpl) offsetExpirationCondition.get();
+        assertEquals(commitTimestamp, 
condition.baseTimestamp().apply(offsetAndMetadata));
+        assertTrue(condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, offsetsRetentionMs));
+    }
+
+    @Test
+    public void testIsInStatesCaseInsensitive() {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        GroupCoordinatorMetricsShard metricsShard = new 
GroupCoordinatorMetricsShard(
+            snapshotRegistry,
+            emptyMap(),
+            new TopicPartition("__consumer_offsets", 0)
+        );
+        StreamsGroup group = new StreamsGroup(snapshotRegistry, "group-foo", 
metricsShard);
+        snapshotRegistry.getOrCreateSnapshot(0);
+        assertTrue(group.isInStates(singleton("empty"), 0));
+        assertFalse(group.isInStates(singleton("Empty"), 0));
+
+        group.updateMember(new StreamsGroupMember.Builder("member1")
+            .build());
+        snapshotRegistry.getOrCreateSnapshot(1);
+        assertTrue(group.isInStates(singleton("empty"), 0));
+        assertTrue(group.isInStates(singleton("stable"), 1));
+        assertFalse(group.isInStates(singleton("empty"), 1));
+    }
+}
\ No newline at end of file

Reply via email to