This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 624dd458099 KAFKA-18321: Add StreamsGroupMember, MemberState and 
Assignment classes (#18276)
624dd458099 is described below

commit 624dd458099fa93b3fa1e1715b58bbc6d8689857
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Jan 8 17:26:41 2025 +0100

    KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes 
(#18276)
    
    * KAFKA-18321: Add StreamsGroupMember, MemberState and Assignment classes
    
    This commit adds the classes to represent a Streams group member in the
    consumer coordinator.
    
    Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy 
<[email protected]>
---
 .../coordinator/group/streams/Assignment.java      |  88 ++++
 .../coordinator/group/streams/MemberState.java     |  74 ++++
 .../group/streams/StreamsGroupMember.java          | 463 +++++++++++++++++++++
 .../coordinator/group/streams/AssignmentTest.java  | 122 ++++++
 .../group/streams/StreamsGroupMemberTest.java      | 429 +++++++++++++++++++
 .../group/streams/TaskAssignmentTestUtil.java      |  57 +++
 6 files changed, 1233 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
new file mode 100644
index 00000000000..da377d19ccd
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable assignment for a member.
+ *
+ * @param activeTasks           Active tasks assigned to the member.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ * @param standbyTasks          Standby tasks assigned to the member.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ * @param warmupTasks           Warm-up tasks assigned to the member.
+ *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
+ */
+public record Assignment(Map<String, Set<Integer>> activeTasks,
+                         Map<String, Set<Integer>> standbyTasks,
+                         Map<String, Set<Integer>> warmupTasks) {
+
+    public Assignment {
+        activeTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
+        standbyTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
+        warmupTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
+    }
+
+    /**
+     * An empty assignment.
+     */
+    public static final Assignment EMPTY = new Assignment(
+        Collections.emptyMap(),
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
+
+    /**
+     * Creates a {{@link 
org.apache.kafka.coordinator.group.streams.Assignment}} from a
+     * {{@link 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
+     *
+     * @param record The record.
+     * @return A {{@link 
org.apache.kafka.coordinator.group.streams.Assignment}}.
+     */
+    public static Assignment fromRecord(
+        StreamsGroupTargetAssignmentMemberValue record
+    ) {
+        return new Assignment(
+            record.activeTasks().stream()
+                .collect(Collectors.toMap(
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
+                        taskId -> new HashSet<>(taskId.partitions())
+                    )
+                ),
+            record.standbyTasks().stream()
+                .collect(Collectors.toMap(
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
+                        taskId -> new HashSet<>(taskId.partitions())
+                    )
+                ),
+            record.warmupTasks().stream()
+                .collect(Collectors.toMap(
+                        
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
+                        taskId -> new HashSet<>(taskId.partitions())
+                    )
+                )
+        );
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
new file mode 100644
index 00000000000..71914da48b2
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/MemberState.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The various states that a member can be in. For their definition, refer to 
the documentation of
+ * {{@link 
org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder}}.
+ */
+public enum MemberState {
+
+    /**
+     * The member is fully reconciled with the desired target assignment.
+     */
+    STABLE((byte) 1),
+
+    /**
+     * The member must revoke some tasks in order to be able to transition to 
the next epoch.
+     */
+    UNREVOKED_TASKS((byte) 2),
+
+    /**
+     * The member transitioned to the last epoch but waits on some tasks which 
have not been revoked by their previous owners yet.
+     */
+    UNRELEASED_TASKS((byte) 3),
+
+    /**
+     * The member is in an unknown state. This can only happen if a future 
version of the software introduces a new state unknown by this
+     * version.
+     */
+    UNKNOWN((byte) 127);
+
+    private static final Map<Byte, MemberState> VALUES_TO_ENUMS = new 
HashMap<>();
+
+    static {
+        for (MemberState state : MemberState.values()) {
+            VALUES_TO_ENUMS.put(state.value(), state);
+        }
+    }
+
+    private final byte value;
+
+    MemberState(byte value) {
+        this.value = value;
+    }
+
+    public byte value() {
+        return value;
+    }
+
+    public static MemberState fromValue(byte value) {
+        MemberState state = VALUES_TO_ENUMS.get(value);
+        if (state == null) {
+            return UNKNOWN;
+        }
+        return state;
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
new file mode 100644
index 00000000000..e23df3f5701
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Contains all information related to a member within a Streams group.
+ * <p>
+ * This class is immutable and is fully backed by records stored in the 
__consumer_offsets topic.
+ *
+ * @param memberId                      The ID of the member.
+ * @param memberEpoch                   The current epoch of the member.
+ * @param previousMemberEpoch           The previous epoch of the member.
+ * @param state                         The current state of the member.
+ * @param instanceId                    The instance ID of the member.
+ * @param rackId                        The rack ID of the member.
+ * @param clientId                      The client ID of the member.
+ * @param clientHost                    The host of the member.
+ * @param rebalanceTimeoutMs            The rebalance timeout in milliseconds.
+ * @param topologyEpoch                 The epoch of the topology the member 
uses.
+ * @param processId                     The ID of the Streams client that 
contains the member.
+ * @param userEndpoint                  The user endpoint exposed for 
Interactive Queries by the Streams client that
+ *                                      contains the member.
+ * @param clientTags                    Tags of the client of the member used 
for rack-aware assignment.
+ * @param assignedActiveTasks           Active tasks assigned to the member.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param assignedStandbyTasks          Standby tasks assigned to the member.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param assignedWarmupTasks           Warm-up tasks assigned to the member.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param activeTasksPendingRevocation  Active tasks assigned to the member 
pending revocation.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param standbyTasksPendingRevocation Standby tasks assigned to the member 
pending revocation.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param warmupTasksPendingRevocation  Warm-up tasks assigned to the member 
pending revocation.
+ *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ */
+@SuppressWarnings("checkstyle:JavaNCSS")
+public record StreamsGroupMember(String memberId,
+                                 Integer memberEpoch,
+                                 Integer previousMemberEpoch,
+                                 MemberState state,
+                                 Optional<String> instanceId,
+                                 Optional<String> rackId,
+                                 String clientId,
+                                 String clientHost,
+                                 Integer rebalanceTimeoutMs,
+                                 Integer topologyEpoch,
+                                 String processId,
+                                 
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
+                                 Map<String, String> clientTags,
+                                 Map<String, Set<Integer>> assignedActiveTasks,
+                                 Map<String, Set<Integer>> 
assignedStandbyTasks,
+                                 Map<String, Set<Integer>> assignedWarmupTasks,
+                                 Map<String, Set<Integer>> 
activeTasksPendingRevocation,
+                                 Map<String, Set<Integer>> 
standbyTasksPendingRevocation,
+                                 Map<String, Set<Integer>> 
warmupTasksPendingRevocation) {
+
+    public StreamsGroupMember {
+        Objects.requireNonNull(memberId, "memberId cannot be null");
+        clientTags = clientTags != null ? 
Collections.unmodifiableMap(clientTags) : null;
+        assignedActiveTasks = assignedActiveTasks != null ? 
Collections.unmodifiableMap(assignedActiveTasks) : null;
+        assignedStandbyTasks = assignedStandbyTasks != null ? 
Collections.unmodifiableMap(assignedStandbyTasks) : null;
+        assignedWarmupTasks = assignedWarmupTasks != null ? 
Collections.unmodifiableMap(assignedWarmupTasks) : null;
+        activeTasksPendingRevocation = activeTasksPendingRevocation != null ? 
Collections.unmodifiableMap(activeTasksPendingRevocation) : null;
+        standbyTasksPendingRevocation = standbyTasksPendingRevocation != null 
? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null;
+        warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ? 
Collections.unmodifiableMap(warmupTasksPendingRevocation) : null;
+    }
+
+    /**
+     * A builder that facilitates the creation of a new member or the update 
of an existing one.
+     * <p>
+     * Please refer to the javadoc of {{@link StreamsGroupMember}} for the 
definition of the fields.
+     */
+    public static class Builder {
+
+        private final String memberId;
+        private Integer memberEpoch = null;
+        private Integer previousMemberEpoch = null;
+        private MemberState state = null;
+        private Optional<String> instanceId = null;
+        private Optional<String> rackId = null;
+        private Integer rebalanceTimeoutMs = null;
+        private String clientId = null;
+        private String clientHost = null;
+        private Integer topologyEpoch = null;
+        private String processId = null;
+        private Optional<StreamsGroupMemberMetadataValue.Endpoint> 
userEndpoint = null;
+        private Map<String, String> clientTags = null;
+        private Map<String, Set<Integer>> assignedActiveTasks = null;
+        private Map<String, Set<Integer>> assignedStandbyTasks = null;
+        private Map<String, Set<Integer>> assignedWarmupTasks = null;
+        private Map<String, Set<Integer>> activeTasksPendingRevocation = null;
+        private Map<String, Set<Integer>> standbyTasksPendingRevocation = null;
+        private Map<String, Set<Integer>> warmupTasksPendingRevocation = null;
+
+        public Builder(String memberId) {
+            this.memberId = Objects.requireNonNull(memberId, "memberId cannot 
be null");
+        }
+
+        public Builder(StreamsGroupMember member) {
+            Objects.requireNonNull(member, "member cannot be null");
+
+            this.memberId = member.memberId;
+            this.memberEpoch = member.memberEpoch;
+            this.previousMemberEpoch = member.previousMemberEpoch;
+            this.instanceId = member.instanceId;
+            this.rackId = member.rackId;
+            this.rebalanceTimeoutMs = member.rebalanceTimeoutMs;
+            this.clientId = member.clientId;
+            this.clientHost = member.clientHost;
+            this.topologyEpoch = member.topologyEpoch;
+            this.processId = member.processId;
+            this.userEndpoint = member.userEndpoint;
+            this.clientTags = member.clientTags;
+            this.state = member.state;
+            this.assignedActiveTasks = member.assignedActiveTasks;
+            this.assignedStandbyTasks = member.assignedStandbyTasks;
+            this.assignedWarmupTasks = member.assignedWarmupTasks;
+            this.activeTasksPendingRevocation = 
member.activeTasksPendingRevocation;
+            this.standbyTasksPendingRevocation = 
member.standbyTasksPendingRevocation;
+            this.warmupTasksPendingRevocation = 
member.warmupTasksPendingRevocation;
+        }
+
+        public Builder updateMemberEpoch(int memberEpoch) {
+            int currentMemberEpoch = this.memberEpoch;
+            this.memberEpoch = memberEpoch;
+            this.previousMemberEpoch = currentMemberEpoch;
+            return this;
+        }
+
+        public Builder setMemberEpoch(int memberEpoch) {
+            this.memberEpoch = memberEpoch;
+            return this;
+        }
+
+        public Builder setPreviousMemberEpoch(int previousMemberEpoch) {
+            this.previousMemberEpoch = previousMemberEpoch;
+            return this;
+        }
+
+        public Builder setInstanceId(String instanceId) {
+            this.instanceId = Optional.ofNullable(instanceId);
+            return this;
+        }
+
+        public Builder maybeUpdateInstanceId(Optional<String> instanceId) {
+            instanceId.ifPresent(this::setInstanceId);
+            return this;
+        }
+
+        public Builder setRackId(String rackId) {
+            this.rackId = Optional.ofNullable(rackId);
+            return this;
+        }
+
+        public Builder maybeUpdateRackId(Optional<String> rackId) {
+            rackId.ifPresent(this::setRackId);
+            return this;
+        }
+
+        public Builder setRebalanceTimeoutMs(int rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
+            return this;
+        }
+
+        public Builder maybeUpdateRebalanceTimeoutMs(OptionalInt 
rebalanceTimeoutMs) {
+            this.rebalanceTimeoutMs = 
rebalanceTimeoutMs.orElse(this.rebalanceTimeoutMs);
+            return this;
+        }
+
+        public Builder setClientId(String clientId) {
+            this.clientId = clientId;
+            return this;
+        }
+
+        public Builder setClientHost(String clientHost) {
+            this.clientHost = clientHost;
+            return this;
+        }
+
+        public Builder setState(MemberState state) {
+            this.state = state;
+            return this;
+        }
+
+        public Builder setTopologyEpoch(int topologyEpoch) {
+            this.topologyEpoch = topologyEpoch;
+            return this;
+        }
+
+        public Builder maybeUpdateTopologyEpoch(OptionalInt topologyEpoch) {
+            this.topologyEpoch = topologyEpoch.orElse(this.topologyEpoch);
+            return this;
+        }
+
+        public Builder setProcessId(String processId) {
+            this.processId = processId;
+            return this;
+        }
+
+        public Builder maybeUpdateProcessId(Optional<String> processId) {
+            this.processId = processId.orElse(this.processId);
+            return this;
+        }
+
+        public Builder 
setUserEndpoint(StreamsGroupMemberMetadataValue.Endpoint userEndpoint) {
+            this.userEndpoint = Optional.ofNullable(userEndpoint);
+            return this;
+        }
+
+        public Builder 
maybeUpdateUserEndpoint(Optional<StreamsGroupMemberMetadataValue.Endpoint> 
userEndpoint) {
+            userEndpoint.ifPresent(this::setUserEndpoint);
+            return this;
+        }
+
+        public Builder setClientTags(Map<String, String> clientTags) {
+            this.clientTags = clientTags;
+            return this;
+        }
+
+        public Builder maybeUpdateClientTags(Optional<Map<String, String>> 
clientTags) {
+            this.clientTags = clientTags.orElse(this.clientTags);
+            return this;
+        }
+
+        public Builder setAssignment(Assignment assignment) {
+            this.assignedActiveTasks = assignment.activeTasks();
+            this.assignedStandbyTasks = assignment.standbyTasks();
+            this.assignedWarmupTasks = assignment.warmupTasks();
+            return this;
+        }
+
+        public Builder setAssignedActiveTasks(Map<String, Set<Integer>> 
assignedActiveTasks) {
+            this.assignedActiveTasks = assignedActiveTasks;
+            return this;
+        }
+
+        public Builder setAssignedStandbyTasks(Map<String, Set<Integer>> 
assignedStandbyTasks) {
+            this.assignedStandbyTasks = assignedStandbyTasks;
+            return this;
+        }
+
+        public Builder setAssignedWarmupTasks(Map<String, Set<Integer>> 
assignedWarmupTasks) {
+            this.assignedWarmupTasks = assignedWarmupTasks;
+            return this;
+        }
+
+        public Builder setAssignmentPendingRevocation(Assignment assignment) {
+            this.activeTasksPendingRevocation = assignment.activeTasks();
+            this.standbyTasksPendingRevocation = assignment.standbyTasks();
+            this.warmupTasksPendingRevocation = assignment.warmupTasks();
+            return this;
+        }
+
+        public Builder setActiveTasksPendingRevocation(
+            Map<String, Set<Integer>> activeTasksPendingRevocation) {
+            this.activeTasksPendingRevocation = activeTasksPendingRevocation;
+            return this;
+        }
+
+        public Builder setStandbyTasksPendingRevocation(
+            Map<String, Set<Integer>> standbyTasksPendingRevocation) {
+            this.standbyTasksPendingRevocation = standbyTasksPendingRevocation;
+            return this;
+        }
+
+        public Builder setWarmupTasksPendingRevocation(
+            Map<String, Set<Integer>> warmupTasksPendingRevocation) {
+            this.warmupTasksPendingRevocation = warmupTasksPendingRevocation;
+            return this;
+        }
+
+        public Builder updateWith(StreamsGroupMemberMetadataValue record) {
+            setInstanceId(record.instanceId());
+            setRackId(record.rackId());
+            setClientId(record.clientId());
+            setClientHost(record.clientHost());
+            setRebalanceTimeoutMs(record.rebalanceTimeoutMs());
+            setTopologyEpoch(record.topologyEpoch());
+            setProcessId(record.processId());
+            setUserEndpoint(record.userEndpoint());
+            
setClientTags(record.clientTags().stream().collect(Collectors.toMap(
+                StreamsGroupMemberMetadataValue.KeyValue::key,
+                StreamsGroupMemberMetadataValue.KeyValue::value
+            )));
+            return this;
+        }
+
+        public Builder updateWith(StreamsGroupCurrentMemberAssignmentValue 
record) {
+            setMemberEpoch(record.memberEpoch());
+            setPreviousMemberEpoch(record.previousMemberEpoch());
+            setState(MemberState.fromValue(record.state()));
+            
setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks()));
+            
setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks()));
+            
setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks()));
+            setActiveTasksPendingRevocation(
+                assignmentFromTaskIds(record.activeTasksPendingRevocation()));
+            setStandbyTasksPendingRevocation(
+                assignmentFromTaskIds(record.standbyTasksPendingRevocation()));
+            setWarmupTasksPendingRevocation(
+                assignmentFromTaskIds(record.warmupTasksPendingRevocation()));
+            return this;
+        }
+
+        private static Map<String, Set<Integer>> assignmentFromTaskIds(
+            List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> 
topicPartitionsList
+        ) {
+            return topicPartitionsList.stream().collect(Collectors.toMap(
+                
StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId,
+                taskIds -> Set.copyOf(taskIds.partitions())));
+        }
+
+        public StreamsGroupMember build() {
+            return new StreamsGroupMember(
+                memberId,
+                memberEpoch,
+                previousMemberEpoch,
+                state,
+                instanceId,
+                rackId,
+                clientId,
+                clientHost,
+                rebalanceTimeoutMs,
+                topologyEpoch,
+                processId,
+                userEndpoint,
+                clientTags,
+                assignedActiveTasks,
+                assignedStandbyTasks,
+                assignedWarmupTasks,
+                activeTasksPendingRevocation,
+                standbyTasksPendingRevocation,
+                warmupTasksPendingRevocation
+            );
+        }
+    }
+
+    /**
+     * @return True if the member is in the Stable state and at the desired 
epoch.
+     */
+    public boolean isReconciledTo(int targetAssignmentEpoch) {
+        return state == MemberState.STABLE && memberEpoch == 
targetAssignmentEpoch;
+    }
+
+    /**
+     * Creates a member description for the Streams group describe response 
from this member.
+     *
+     * @param targetAssignment The target assignment of this member in the 
corresponding group.
+     *
+     * @return The StreamsGroupMember mapped as 
StreamsGroupDescribeResponseData.Member.
+     */
+    public StreamsGroupDescribeResponseData.Member 
asStreamsGroupDescribeMember(
+        Assignment targetAssignment
+    ) {
+        final StreamsGroupDescribeResponseData.Assignment 
describedTargetAssignment =
+            new StreamsGroupDescribeResponseData.Assignment();
+
+        if (targetAssignment != null) {
+            describedTargetAssignment
+                .setActiveTasks(taskIdsFromMap(targetAssignment.activeTasks()))
+                
.setStandbyTasks(taskIdsFromMap(targetAssignment.standbyTasks()))
+                
.setWarmupTasks(taskIdsFromMap(targetAssignment.warmupTasks()));
+        }
+
+        return new StreamsGroupDescribeResponseData.Member()
+            .setMemberEpoch(memberEpoch)
+            .setMemberId(memberId)
+            .setAssignment(
+                new StreamsGroupDescribeResponseData.Assignment()
+                    .setActiveTasks(taskIdsFromMap(assignedActiveTasks))
+                    .setStandbyTasks(taskIdsFromMap(assignedStandbyTasks))
+                    .setWarmupTasks(taskIdsFromMap(assignedWarmupTasks)))
+            .setTargetAssignment(describedTargetAssignment)
+            .setClientHost(clientHost)
+            .setClientId(clientId)
+            .setInstanceId(instanceId.orElse(null))
+            .setRackId(rackId.orElse(null))
+            .setClientTags(clientTags.entrySet().stream().map(
+                entry -> new StreamsGroupDescribeResponseData.KeyValue()
+                    .setKey(entry.getKey())
+                    .setValue(entry.getValue())
+            ).collect(Collectors.toList()))
+            .setProcessId(processId)
+            .setTopologyEpoch(topologyEpoch)
+            .setUserEndpoint(
+                userEndpoint.map(
+                    endpoint -> new StreamsGroupDescribeResponseData.Endpoint()
+                        .setHost(endpoint.host())
+                        .setPort(endpoint.port())
+                    ).orElse(null)
+            );
+    }
+
+    private static List<StreamsGroupDescribeResponseData.TaskIds> 
taskIdsFromMap(
+        Map<String, Set<Integer>> tasks
+    ) {
+        List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new 
ArrayList<>();
+        tasks.forEach((subtopologyId, partitionSet) -> {
+            taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
+                .setSubtopologyId(subtopologyId)
+                .setPartitions(new ArrayList<>(partitionSet)));
+        });
+        return taskIds;
+    }
+
+    /**
+     * @return True if the two provided members have different assigned active 
tasks.
+     */
+    public static boolean hasAssignedActiveTasksChanged(
+        StreamsGroupMember member1,
+        StreamsGroupMember member2
+    ) {
+        return 
!member1.assignedActiveTasks().equals(member2.assignedActiveTasks());
+    }
+
+    /**
+     * @return True if the two provided members have different assigned active 
tasks.
+     */
+    public static boolean hasAssignedStandbyTasksChanged(
+        StreamsGroupMember member1,
+        StreamsGroupMember member2
+    ) {
+        return 
!member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks());
+    }
+
+    /**
+     * @return True if the two provided members have different assigned active 
tasks.
+     */
+    public static boolean hasAssignedWarmupTasksChanged(
+        StreamsGroupMember member1,
+        StreamsGroupMember member2
+    ) {
+        return 
!member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks());
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
new file mode 100644
index 00000000000..7c0baf27364
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class AssignmentTest {
+
+    static final String SUBTOPOLOGY_1 = "subtopology1";
+    static final String SUBTOPOLOGY_2 = "subtopology2";
+    static final String SUBTOPOLOGY_3 = "subtopology3";
+
+    @Test
+    public void testTasksCannotBeNull() {
+        assertThrows(NullPointerException.class, () -> new Assignment(null, 
Collections.emptyMap(), Collections.emptyMap()));
+        assertThrows(NullPointerException.class, () -> new 
Assignment(Collections.emptyMap(), null, Collections.emptyMap()));
+        assertThrows(NullPointerException.class, () -> new 
Assignment(Collections.emptyMap(), Collections.emptyMap(), null));
+    }
+
+    @Test
+    public void testReturnUnmodifiableTaskAssignments() {
+        Map<String, Set<Integer>> activeTasks = mkTasksPerSubtopology(
+            mkTasks(SUBTOPOLOGY_1, 1, 2, 3)
+        );
+        Map<String, Set<Integer>> standbyTasks = mkTasksPerSubtopology(
+            mkTasks(SUBTOPOLOGY_2, 9, 8, 7)
+        );
+        Map<String, Set<Integer>> warmupTasks = mkTasksPerSubtopology(
+            mkTasks(SUBTOPOLOGY_3, 4, 5, 6)
+        );
+        Assignment assignment = new Assignment(activeTasks, standbyTasks, 
warmupTasks);
+
+        assertEquals(activeTasks, assignment.activeTasks());
+        assertThrows(UnsupportedOperationException.class, () -> 
assignment.activeTasks().put("not allowed", Collections.emptySet()));
+        assertEquals(standbyTasks, assignment.standbyTasks());
+        assertThrows(UnsupportedOperationException.class, () -> 
assignment.standbyTasks().put("not allowed", Collections.emptySet()));
+        assertEquals(warmupTasks, assignment.warmupTasks());
+        assertThrows(UnsupportedOperationException.class, () -> 
assignment.warmupTasks().put("not allowed", Collections.emptySet()));
+    }
+
+    @Test
+    public void testFromTargetAssignmentRecord() {
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> activeTasks = 
new ArrayList<>();
+        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+            .setSubtopologyId(SUBTOPOLOGY_1)
+            .setPartitions(Arrays.asList(1, 2, 3)));
+        activeTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+            .setSubtopologyId(SUBTOPOLOGY_2)
+            .setPartitions(Arrays.asList(4, 5, 6)));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> standbyTasks = 
new ArrayList<>();
+        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+            .setSubtopologyId(SUBTOPOLOGY_1)
+            .setPartitions(Arrays.asList(7, 8, 9)));
+        standbyTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+            .setSubtopologyId(SUBTOPOLOGY_2)
+            .setPartitions(Arrays.asList(1, 2, 3)));
+        List<StreamsGroupTargetAssignmentMemberValue.TaskIds> warmupTasks = 
new ArrayList<>();
+        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+            .setSubtopologyId(SUBTOPOLOGY_1)
+            .setPartitions(Arrays.asList(4, 5, 6)));
+        warmupTasks.add(new StreamsGroupTargetAssignmentMemberValue.TaskIds()
+            .setSubtopologyId(SUBTOPOLOGY_2)
+            .setPartitions(Arrays.asList(7, 8, 9)));
+
+        StreamsGroupTargetAssignmentMemberValue record = new 
StreamsGroupTargetAssignmentMemberValue()
+            .setActiveTasks(activeTasks)
+            .setStandbyTasks(standbyTasks)
+            .setWarmupTasks(warmupTasks);
+
+        Assignment assignment = Assignment.fromRecord(record);
+
+        assertEquals(
+            mkTasksPerSubtopology(
+                mkTasks(SUBTOPOLOGY_1, 1, 2, 3),
+                mkTasks(SUBTOPOLOGY_2, 4, 5, 6)
+            ),
+            assignment.activeTasks()
+        );
+        assertEquals(
+            mkTasksPerSubtopology(
+                mkTasks(SUBTOPOLOGY_1, 7, 8, 9),
+                mkTasks(SUBTOPOLOGY_2, 1, 2, 3)
+            ),
+            assignment.standbyTasks()
+        );
+        assertEquals(
+            mkTasksPerSubtopology(
+                mkTasks(SUBTOPOLOGY_1, 4, 5, 6),
+                mkTasks(SUBTOPOLOGY_2, 7, 8, 9)
+            ),
+            assignment.warmupTasks()
+        );
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
new file mode 100644
index 00000000000..8c6d3d9088a
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue.TaskIds;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.KeyValue;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class StreamsGroupMemberTest {
+
+    private static final String MEMBER_ID = "member-id";
+    private static final int MEMBER_EPOCH = 10;
+    private static final int PREVIOUS_MEMBER_EPOCH = 9;
+    private static final MemberState STATE = MemberState.UNRELEASED_TASKS;
+    private static final String INSTANCE_ID = "instance-id";
+    private static final String RACK_ID = "rack-id";
+    private static final int REBALANCE_TIMEOUT = 5000;
+    private static final String CLIENT_ID = "client-id";
+    private static final String HOSTNAME = "hostname";
+    private static final int TOPOLOGY_EPOCH = 3;
+    private static final String PROCESS_ID = "process-id";
+    private static final String SUBTOPOLOGY1 = "subtopology1";
+    private static final String SUBTOPOLOGY2 = "subtopology2";
+    private static final String SUBTOPOLOGY3 = "subtopology3";
+    private static final StreamsGroupMemberMetadataValue.Endpoint 
USER_ENDPOINT =
+        new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090);
+    private static final String CLIENT_TAG_KEY = "client";
+    private static final String CLIENT_TAG_VALUE = "tag";
+    private static final Map<String, String> CLIENT_TAGS = 
mkMap(mkEntry(CLIENT_TAG_KEY, CLIENT_TAG_VALUE));
+    private static final List<Integer> TASKS1 = List.of(1, 2, 3);
+    private static final List<Integer> TASKS2 = List.of(4, 5, 6);
+    private static final List<Integer> TASKS3 = List.of(7, 8);
+    private static final List<Integer> TASKS4 = List.of(3, 2, 1);
+    private static final List<Integer> TASKS5 = List.of(6, 5, 4);
+    private static final List<Integer> TASKS6 = List.of(9, 7);
+    private static final Map<String, Set<Integer>> ASSIGNED_ACTIVE_TASKS = 
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new)));
+    private static final Map<String, Set<Integer>> ASSIGNED_STANDBY_TASKS = 
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new)));
+    private static final Map<String, Set<Integer>> ASSIGNED_WARMUP_TASKS = 
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new)));
+    private static final Map<String, Set<Integer>> 
ACTIVE_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, 
TASKS4.toArray(Integer[]::new)));
+    private static final Map<String, Set<Integer>> 
STANDBY_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, 
TASKS5.toArray(Integer[]::new)));
+    private static final Map<String, Set<Integer>> 
WARMUP_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, 
TASKS6.toArray(Integer[]::new)));
+
+    @Test
+    public void testBuilderWithMemberIdIsNull() {
+        final Exception exception = assertThrows(
+            NullPointerException.class,
+            () -> new StreamsGroupMember.Builder((String) null).build()
+        );
+        assertEquals("memberId cannot be null", exception.getMessage());
+    }
+
+    @Test
+    public void testBuilderWithMemberIsNull() {
+        final Exception exception = assertThrows(
+            NullPointerException.class,
+            () -> new StreamsGroupMember.Builder((StreamsGroupMember) 
null).build()
+        );
+        assertEquals("member cannot be null", exception.getMessage());
+    }
+
+    @Test
+    public void testBuilderWithDefaults() {
+        StreamsGroupMember member = new 
StreamsGroupMember.Builder(MEMBER_ID).build();
+
+        assertEquals(MEMBER_ID, member.memberId());
+        assertNull(member.memberEpoch());
+        assertNull(member.previousMemberEpoch());
+        assertNull(member.state());
+        assertNull(member.instanceId());
+        assertNull(member.rackId());
+        assertNull(member.rebalanceTimeoutMs());
+        assertNull(member.clientId());
+        assertNull(member.clientHost());
+        assertNull(member.topologyEpoch());
+        assertNull(member.processId());
+        assertNull(member.userEndpoint());
+        assertNull(member.clientTags());
+        assertNull(member.assignedActiveTasks());
+        assertNull(member.assignedStandbyTasks());
+        assertNull(member.assignedWarmupTasks());
+        assertNull(member.activeTasksPendingRevocation());
+        assertNull(member.standbyTasksPendingRevocation());
+        assertNull(member.warmupTasksPendingRevocation());
+    }
+
+    @Test
+    public void testBuilderNewMember() {
+        StreamsGroupMember member = createStreamsGroupMember();
+
+        assertEquals(MEMBER_ID, member.memberId());
+        assertEquals(MEMBER_EPOCH, member.memberEpoch());
+        assertEquals(PREVIOUS_MEMBER_EPOCH, member.previousMemberEpoch());
+        assertEquals(STATE, member.state());
+        assertEquals(Optional.of(INSTANCE_ID), member.instanceId());
+        assertEquals(Optional.of(RACK_ID), member.rackId());
+        assertEquals(CLIENT_ID, member.clientId());
+        assertEquals(HOSTNAME, member.clientHost());
+        assertEquals(TOPOLOGY_EPOCH, member.topologyEpoch());
+        assertEquals(PROCESS_ID, member.processId());
+        assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint());
+        assertEquals(CLIENT_TAGS, member.clientTags());
+        assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
+        assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
+        assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
+        assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, 
member.activeTasksPendingRevocation());
+        assertEquals(STANDBY_TASKS_PENDING_REVOCATION, 
member.standbyTasksPendingRevocation());
+        assertEquals(WARMUP_TASKS_PENDING_REVOCATION, 
member.warmupTasksPendingRevocation());
+    }
+
+    @Test
+    public void testBuilderUpdateWithStreamsGroupMemberMetadataValue() {
+        StreamsGroupMemberMetadataValue record = new 
StreamsGroupMemberMetadataValue()
+            .setClientId(CLIENT_ID)
+            .setClientHost(HOSTNAME)
+            .setInstanceId(INSTANCE_ID)
+            .setRackId(RACK_ID)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT)
+            .setTopologyEpoch(TOPOLOGY_EPOCH)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(USER_ENDPOINT)
+            .setClientTags(CLIENT_TAGS.entrySet().stream()
+                .map(e -> new 
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+                .collect(Collectors.toList()));
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+            .updateWith(record)
+            .build();
+
+        assertEquals(record.clientId(), member.clientId());
+        assertEquals(record.clientHost(), member.clientHost());
+        assertEquals(Optional.of(record.instanceId()), member.instanceId());
+        assertEquals(Optional.of(record.rackId()), member.rackId());
+        assertEquals(record.rebalanceTimeoutMs(), member.rebalanceTimeoutMs());
+        assertEquals(record.topologyEpoch(), member.topologyEpoch());
+        assertEquals(record.processId(), member.processId());
+        assertEquals(Optional.of(record.userEndpoint()), 
member.userEndpoint());
+        assertEquals(
+            
record.clientTags().stream().collect(Collectors.toMap(KeyValue::key, 
KeyValue::value)),
+            member.clientTags()
+        );
+        assertEquals(MEMBER_ID, member.memberId());
+        assertNull(member.memberEpoch());
+        assertNull(member.previousMemberEpoch());
+        assertNull(member.state());
+        assertNull(member.assignedActiveTasks());
+        assertNull(member.assignedStandbyTasks());
+        assertNull(member.assignedWarmupTasks());
+        assertNull(member.activeTasksPendingRevocation());
+        assertNull(member.standbyTasksPendingRevocation());
+        assertNull(member.warmupTasksPendingRevocation());
+    }
+
+    @Test
+    public void 
testBuilderUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
+        StreamsGroupCurrentMemberAssignmentValue record = new 
StreamsGroupCurrentMemberAssignmentValue()
+            .setMemberEpoch(MEMBER_EPOCH)
+            .setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH)
+            .setState(STATE.value())
+            .setActiveTasks(List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS1)))
+            .setStandbyTasks(List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS2)))
+            .setWarmupTasks(List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS3)))
+            .setActiveTasksPendingRevocation(List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS4)))
+            .setStandbyTasksPendingRevocation(List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY1).setPartitions(TASKS5)))
+            .setWarmupTasksPendingRevocation(List.of(new 
TaskIds().setSubtopologyId(SUBTOPOLOGY2).setPartitions(TASKS6)));
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_ID)
+            .updateWith(record)
+            .build();
+
+        assertEquals(MEMBER_ID, member.memberId());
+        assertEquals(record.memberEpoch(), member.memberEpoch());
+        assertEquals(record.previousMemberEpoch(), 
member.previousMemberEpoch());
+        assertEquals(MemberState.fromValue(record.state()), member.state());
+        assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
+        assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
+        assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
+        assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, 
member.activeTasksPendingRevocation());
+        assertEquals(STANDBY_TASKS_PENDING_REVOCATION, 
member.standbyTasksPendingRevocation());
+        assertEquals(WARMUP_TASKS_PENDING_REVOCATION, 
member.warmupTasksPendingRevocation());
+        assertNull(member.instanceId());
+        assertNull(member.rackId());
+        assertNull(member.rebalanceTimeoutMs());
+        assertNull(member.clientId());
+        assertNull(member.clientHost());
+        assertNull(member.topologyEpoch());
+        assertNull(member.processId());
+        assertNull(member.userEndpoint());
+        assertNull(member.clientTags());
+    }
+
+    @Test
+    public void testBuilderMaybeUpdateMember() {
+        final StreamsGroupMember member = createStreamsGroupMember();
+
+        // This is a no-op.
+        StreamsGroupMember updatedMember = new 
StreamsGroupMember.Builder(member)
+            .maybeUpdateRackId(Optional.empty())
+            .maybeUpdateInstanceId(Optional.empty())
+            .maybeUpdateRebalanceTimeoutMs(OptionalInt.empty())
+            .maybeUpdateProcessId(Optional.empty())
+            .maybeUpdateTopologyEpoch(OptionalInt.empty())
+            .maybeUpdateUserEndpoint(Optional.empty())
+            .maybeUpdateClientTags(Optional.empty())
+            .build();
+
+        assertEquals(member, updatedMember);
+
+        final String newRackId = "new" + member.rackId();
+        final String newInstanceId = "new" + member.instanceId();
+        final Integer newRebalanceTimeout = member.rebalanceTimeoutMs() + 1000;
+        final String newProcessId = "new" + member.processId();
+        final Integer newTopologyEpoch = member.topologyEpoch() + 1;
+        final StreamsGroupMemberMetadataValue.Endpoint newUserEndpoint =
+            new 
StreamsGroupMemberMetadataValue.Endpoint().setHost(member.userEndpoint().get().host()
 + "2").setPort(9090);
+        final Map<String, String> newClientTags = new 
HashMap<>(member.clientTags());
+        newClientTags.put("client2", "tag2");
+
+        updatedMember = new StreamsGroupMember.Builder(member)
+            .maybeUpdateRackId(Optional.of(newRackId))
+            .maybeUpdateInstanceId(Optional.of(newInstanceId))
+            .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000))
+            .maybeUpdateProcessId(Optional.of(newProcessId))
+            .maybeUpdateTopologyEpoch(OptionalInt.of(newTopologyEpoch))
+            .maybeUpdateUserEndpoint(Optional.of(newUserEndpoint))
+            .maybeUpdateClientTags(Optional.of(newClientTags))
+            .build();
+
+        assertEquals(Optional.of(newRackId), updatedMember.rackId());
+        assertEquals(Optional.of(newInstanceId), updatedMember.instanceId());
+        assertEquals(newRebalanceTimeout, updatedMember.rebalanceTimeoutMs());
+        assertEquals(newProcessId, updatedMember.processId());
+        assertEquals(newTopologyEpoch, updatedMember.topologyEpoch());
+        assertEquals(Optional.of(newUserEndpoint), 
updatedMember.userEndpoint());
+        assertEquals(newClientTags, updatedMember.clientTags());
+        assertEquals(member.memberId(), updatedMember.memberId());
+        assertEquals(member.memberEpoch(), updatedMember.memberEpoch());
+        assertEquals(member.previousMemberEpoch(), 
updatedMember.previousMemberEpoch());
+        assertEquals(member.state(), updatedMember.state());
+        assertEquals(member.clientId(), updatedMember.clientId());
+        assertEquals(member.clientHost(), updatedMember.clientHost());
+        assertEquals(member.assignedActiveTasks(), 
updatedMember.assignedActiveTasks());
+        assertEquals(member.assignedStandbyTasks(), 
updatedMember.assignedStandbyTasks());
+        assertEquals(member.assignedWarmupTasks(), 
updatedMember.assignedWarmupTasks());
+        assertEquals(member.activeTasksPendingRevocation(), 
updatedMember.activeTasksPendingRevocation());
+        assertEquals(member.standbyTasksPendingRevocation(), 
updatedMember.standbyTasksPendingRevocation());
+        assertEquals(member.warmupTasksPendingRevocation(), 
updatedMember.warmupTasksPendingRevocation());
+    }
+
+    @Test
+    public void testBuilderUpdateMemberEpoch() {
+        final StreamsGroupMember member = createStreamsGroupMember();
+
+        final int newMemberEpoch = member.memberEpoch() + 1;
+        final StreamsGroupMember updatedMember = new 
StreamsGroupMember.Builder(member)
+            .updateMemberEpoch(newMemberEpoch)
+            .build();
+
+        assertEquals(member.memberId(), updatedMember.memberId());
+        assertEquals(newMemberEpoch, updatedMember.memberEpoch());
+        // The previous member epoch becomes the old current member epoch.
+        assertEquals(member.memberEpoch(), 
updatedMember.previousMemberEpoch());
+        assertEquals(member.state(), updatedMember.state());
+        assertEquals(member.instanceId(), updatedMember.instanceId());
+        assertEquals(member.rackId(), updatedMember.rackId());
+        assertEquals(member.rebalanceTimeoutMs(), 
updatedMember.rebalanceTimeoutMs());
+        assertEquals(member.clientId(), updatedMember.clientId());
+        assertEquals(member.clientHost(), updatedMember.clientHost());
+        assertEquals(member.topologyEpoch(), updatedMember.topologyEpoch());
+        assertEquals(member.processId(), updatedMember.processId());
+        assertEquals(member.userEndpoint(), updatedMember.userEndpoint());
+        assertEquals(member.clientTags(), updatedMember.clientTags());
+        assertEquals(member.assignedActiveTasks(), 
updatedMember.assignedActiveTasks());
+        assertEquals(member.assignedStandbyTasks(), 
updatedMember.assignedStandbyTasks());
+        assertEquals(member.assignedWarmupTasks(), 
updatedMember.assignedWarmupTasks());
+        assertEquals(member.activeTasksPendingRevocation(), 
updatedMember.activeTasksPendingRevocation());
+        assertEquals(member.standbyTasksPendingRevocation(), 
updatedMember.standbyTasksPendingRevocation());
+        assertEquals(member.warmupTasksPendingRevocation(), 
updatedMember.warmupTasksPendingRevocation());
+    }
+
+    @Test
+    public void testReturnUnmodifiableFields() {
+        final StreamsGroupMember member = createStreamsGroupMember();
+
+        assertThrows(UnsupportedOperationException.class, () -> 
member.clientTags().put("not allowed", ""));
+        assertThrows(UnsupportedOperationException.class, () -> 
member.assignedActiveTasks().put("not allowed", Collections.emptySet()));
+        assertThrows(UnsupportedOperationException.class, () -> 
member.assignedStandbyTasks().put("not allowed", Collections.emptySet()));
+        assertThrows(UnsupportedOperationException.class, () -> 
member.assignedWarmupTasks().put("not allowed", Collections.emptySet()));
+        assertThrows(UnsupportedOperationException.class, () -> 
member.activeTasksPendingRevocation().put("not allowed", 
Collections.emptySet()));
+        assertThrows(UnsupportedOperationException.class, () -> 
member.standbyTasksPendingRevocation().put("not allowed", 
Collections.emptySet()));
+        assertThrows(UnsupportedOperationException.class, () -> 
member.warmupTasksPendingRevocation().put("not allowed", 
Collections.emptySet()));
+    }
+
+    @Test
+    public void testAsStreamsGroupDescribeMember() {
+        final StreamsGroupMember member = createStreamsGroupMember();
+        List<Integer> assignedTasks1 = Arrays.asList(10, 11, 12);
+        List<Integer> assignedTasks2 = Arrays.asList(13, 14, 15);
+        List<Integer> assignedTasks3 = Arrays.asList(16, 17, 18);
+        Assignment targetAssignment = new Assignment(
+            mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))),
+            mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))),
+            mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1)))
+        );
+
+        StreamsGroupDescribeResponseData.Member actual = 
member.asStreamsGroupDescribeMember(targetAssignment);
+        StreamsGroupDescribeResponseData.Member expected = new 
StreamsGroupDescribeResponseData.Member()
+            .setMemberId(MEMBER_ID)
+            .setMemberEpoch(MEMBER_EPOCH)
+            .setClientId(CLIENT_ID)
+            .setInstanceId(INSTANCE_ID)
+            .setRackId(RACK_ID)
+            .setClientHost(HOSTNAME)
+            .setProcessId(PROCESS_ID)
+            .setTopologyEpoch(TOPOLOGY_EPOCH)
+            .setClientTags(List.of(
+                new 
StreamsGroupDescribeResponseData.KeyValue().setKey(CLIENT_TAG_KEY).setValue(CLIENT_TAG_VALUE))
+            )
+            .setAssignment(
+                new StreamsGroupDescribeResponseData.Assignment()
+                    .setActiveTasks(List.of(
+                        new StreamsGroupDescribeResponseData.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY1)
+                            .setPartitions(TASKS1))
+                    )
+                    .setStandbyTasks(List.of(
+                        new StreamsGroupDescribeResponseData.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY2)
+                            .setPartitions(TASKS2))
+                    )
+                    .setWarmupTasks(List.of(
+                        new StreamsGroupDescribeResponseData.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY1)
+                            .setPartitions(TASKS3))
+                    )
+            )
+            .setTargetAssignment(
+                new StreamsGroupDescribeResponseData.Assignment()
+                    .setActiveTasks(List.of(
+                        new StreamsGroupDescribeResponseData.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY1)
+                            .setPartitions(assignedTasks3))
+                    )
+                    .setStandbyTasks(List.of(
+                        new StreamsGroupDescribeResponseData.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY2)
+                            .setPartitions(assignedTasks2))
+                    )
+                    .setWarmupTasks(List.of(
+                        new StreamsGroupDescribeResponseData.TaskIds()
+                            .setSubtopologyId(SUBTOPOLOGY3)
+                            .setPartitions(assignedTasks1))
+                    )
+            )
+            .setUserEndpoint(new StreamsGroupDescribeResponseData.Endpoint()
+                .setHost(USER_ENDPOINT.host())
+                .setPort(USER_ENDPOINT.port())
+            );
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testAsStreamsGroupDescribeWithTargetAssignmentNull() {
+        final StreamsGroupMember member = createStreamsGroupMember();
+        StreamsGroupDescribeResponseData.Member streamsGroupDescribeMember = 
member.asStreamsGroupDescribeMember(null);
+
+        assertEquals(new StreamsGroupDescribeResponseData.Assignment(), 
streamsGroupDescribeMember.targetAssignment());
+    }
+
+    private StreamsGroupMember createStreamsGroupMember() {
+        return new StreamsGroupMember.Builder(MEMBER_ID)
+            .setMemberEpoch(MEMBER_EPOCH)
+            .setPreviousMemberEpoch(PREVIOUS_MEMBER_EPOCH)
+            .setState(STATE)
+            .setInstanceId(INSTANCE_ID)
+            .setRackId(RACK_ID)
+            .setRebalanceTimeoutMs(REBALANCE_TIMEOUT)
+            .setClientId(CLIENT_ID)
+            .setClientHost(HOSTNAME)
+            .setTopologyEpoch(TOPOLOGY_EPOCH)
+            .setProcessId(PROCESS_ID)
+            .setUserEndpoint(USER_ENDPOINT)
+            .setClientTags(CLIENT_TAGS)
+            .setAssignedActiveTasks(ASSIGNED_ACTIVE_TASKS)
+            .setAssignedStandbyTasks(ASSIGNED_STANDBY_TASKS)
+            .setAssignedWarmupTasks(ASSIGNED_WARMUP_TASKS)
+            .setActiveTasksPendingRevocation(ACTIVE_TASKS_PENDING_REVOCATION)
+            .setStandbyTasksPendingRevocation(STANDBY_TASKS_PENDING_REVOCATION)
+            .setWarmupTasksPendingRevocation(WARMUP_TASKS_PENDING_REVOCATION)
+            .build();
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
new file mode 100644
index 00000000000..47668ec84c0
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class TaskAssignmentTestUtil {
+
+    public static Assignment mkAssignment(final Map<String, Set<Integer>> 
activeTasks,
+                                          final Map<String, Set<Integer>> 
standbyTasks,
+                                          final Map<String, Set<Integer>> 
warmupTasks) {
+        return new Assignment(
+            Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)),
+            Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)),
+            Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks))
+        );
+    }
+
+    public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId,
+                                                          Integer... tasks) {
+        return new AbstractMap.SimpleEntry<>(
+            subtopologyId,
+            new HashSet<>(List.of(tasks))
+        );
+    }
+
+    @SafeVarargs
+    public static Map<String, Set<Integer>> 
mkTasksPerSubtopology(Map.Entry<String,
+                                                                  
Set<Integer>>... entries) {
+        Map<String, Set<Integer>> assignment = new HashMap<>();
+        for (Map.Entry<String, Set<Integer>> entry : entries) {
+            assignment.put(entry.getKey(), entry.getValue());
+        }
+        return assignment;
+    }
+}

Reply via email to