This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aea699bdef0 KAFKA-18324: Add CurrentAssignmentBuilder (#18476)
aea699bdef0 is described below

commit aea699bdef01b169b73f14c6f1d58df456056e16
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jan 23 17:35:03 2025 +0100

    KAFKA-18324: Add CurrentAssignmentBuilder (#18476)
    
    Implements the current assignment builder, analogous to the current 
assignment builder of consumer groups. The main difference is the underlying 
assigned resource, and slightly different logic around process IDs: We make 
sure to move a task only to a new client, once the task is not owned anymore by 
any client with the same process ID (sharing the same state directory) - in any 
role (active, standby or warm-up).
    
    Compared to the feature branch, the main difference is that I refactored 
the separate treatment of active, standby and warm-up tasks into a compound 
datatype called TaskTuple (which is used in place of the more specific 
Assignment class). This also has effects on StreamsGroupMember.
    
    Reviewers: Bruno Cadonna <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../group/streams/CurrentAssignmentBuilder.java    | 451 +++++++++++
 .../group/streams/StreamsGroupMember.java          | 163 +---
 .../streams/{Assignment.java => TasksTuple.java}   |  73 +-
 .../group/streams/assignor/GroupAssignment.java    |   2 +-
 .../streams/CurrentAssignmentBuilderTest.java      | 825 +++++++++++++++++++++
 .../group/streams/StreamsGroupMemberTest.java      | 132 ++--
 .../group/streams/TaskAssignmentTestUtil.java      |  26 +-
 .../{AssignmentTest.java => TasksTupleTest.java}   |  95 ++-
 8 files changed, 1530 insertions(+), 237 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
new file mode 100644
index 00000000000..3c9ba064a40
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the streams group protocol. Given the current state of a
+ * member and a desired or target assignment state, the state machine takes 
the necessary steps to converge them.
+ */
+public class CurrentAssignmentBuilder {
+
+    /**
+     * The streams group member which is reconciled.
+     */
+    private final StreamsGroupMember member;
+
+    /**
+     * The target assignment epoch.
+     */
+    private int targetAssignmentEpoch;
+
+    /**
+     * The target assignment.
+     */
+    private TasksTuple targetAssignment;
+
+    /**
+     * A function which returns the current process ID of an active task or 
null if the active task
+     * is not assigned. The current process ID is the process ID of the 
current owner.
+     */
+    private BiFunction<String, Integer, String> currentActiveTaskProcessId;
+
+    /**
+     * A function which returns the current process IDs of a standby task or 
null if the standby
+     * task is not assigned. The current process IDs are the process IDs of 
all current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentStandbyTaskProcessIds;
+
+    /**
+     * A function which returns the current process IDs of a warmup task or 
null if the warmup task
+     * is not assigned. The current process IDs are the process IDs of all 
current owners.
+     */
+    private BiFunction<String, Integer, Set<String>> 
currentWarmupTaskProcessIds;
+
+    /**
+     * The tasks owned by the member. This may be provided by the member in 
the StreamsGroupHeartbeat request.
+     */
+    private Optional<TasksTuple> ownedTasks = Optional.empty();
+
+    /**
+     * Constructs the CurrentAssignmentBuilder based on the current state of 
the provided streams group member.
+     *
+     * @param member The streams group member that must be reconciled.
+     */
+    public CurrentAssignmentBuilder(StreamsGroupMember member) {
+        this.member = Objects.requireNonNull(member);
+    }
+
+    /**
+     * Sets the target assignment epoch and the target assignment that the 
streams group member must be reconciled to.
+     *
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withTargetAssignment(int 
targetAssignmentEpoch,
+                                                         TasksTuple 
targetAssignment) {
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
+        this.targetAssignment = Objects.requireNonNull(targetAssignment);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process ID of an 
active task. This is
+     * used by the state machine to determine if an active task is free or 
still used by another
+     * member, and if there is still a task on a specific process that is not 
yet revoked.
+     *
+     * @param currentActiveTaskProcessId A BiFunction which gets the process 
ID of a subtopology ID /
+     *                                   partition ID pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder 
withCurrentActiveTaskProcessId(BiFunction<String, Integer, String> 
currentActiveTaskProcessId) {
+        this.currentActiveTaskProcessId = 
Objects.requireNonNull(currentActiveTaskProcessId);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
standby task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentStandbyTaskProcessIds A BiFunction which gets the process 
IDs of a subtopology
+     *                                     ID / partition ID pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(
+        BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds
+    ) {
+        this.currentStandbyTaskProcessIds = 
Objects.requireNonNull(currentStandbyTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets a BiFunction which allows to retrieve the current process IDs of a 
warmup task. This is
+     * used by the state machine to determine if there is still a task on a 
specific process that is
+     * not yet revoked.
+     *
+     * @param currentWarmupTaskProcessIds A BiFunction which gets the process 
IDs of a subtopology ID
+     *                                    / partition ID pair.
+     * @return This object.
+     */
+    public CurrentAssignmentBuilder 
withCurrentWarmupTaskProcessIds(BiFunction<String, Integer, Set<String>> 
currentWarmupTaskProcessIds) {
+        this.currentWarmupTaskProcessIds = 
Objects.requireNonNull(currentWarmupTaskProcessIds);
+        return this;
+    }
+
+    /**
+     * Sets the tasks currently owned by the member. This comes directly from 
the last StreamsGroupHeartbeat request. This is used to
+     * determine if the member has revoked the necessary tasks. Passing null 
into this function means that the member did not provide
+     * its owned tasks in this heartbeat.
+     *
+     * @param ownedAssignment A collection of active, standby and warm-up tasks
+     * @return This object.
+     */
+    protected CurrentAssignmentBuilder withOwnedAssignment(TasksTuple 
ownedAssignment) {
+        this.ownedTasks = Optional.ofNullable(ownedAssignment);
+        return this;
+    }
+
+    /**
+     * Builds the next state for the member or keep the current one if it is 
not possible to move forward with the current state.
+     *
+     * @return A new StreamsGroupMember or the current one.
+     */
+    public StreamsGroupMember build() {
+        switch (member.state()) {
+            case STABLE:
+                // When the member is in the STABLE state, we verify if a newer
+                // epoch (or target assignment) is available. If it is, we can
+                // reconcile the member towards it. Otherwise, we return.
+                if (member.memberEpoch() != targetAssignmentEpoch) {
+                    return computeNextAssignment(
+                        member.memberEpoch(),
+                        member.assignedTasks()
+                    );
+                } else {
+                    return member;
+                }
+
+            case UNREVOKED_TASKS:
+                // When the member is in the UNREVOKED_TASKS state, we wait
+                // until the member has revoked the necessary tasks. They are
+                // considered revoked when they are not anymore reported in the
+                // owned tasks set in the StreamsGroupHeartbeat API.
+
+                // If the member provides its owned tasks, we verify if it 
still
+                // owns any of the revoked tasks. If it did not provide it's
+                // owned tasks, or we still own some of the revoked tasks, we
+                // cannot progress.
+                if (
+                    ownedTasks.isEmpty() || 
ownedTasks.get().containsAny(member.tasksPendingRevocation())
+                ) {
+                    return member;
+                }
+
+                // When the member has revoked all the pending tasks, it can
+                // transition to the next epoch (current + 1) and we can 
reconcile
+                // its state towards the latest target assignment.
+                return computeNextAssignment(
+                    member.memberEpoch() + 1,
+                    member.assignedTasks()
+                );
+
+            case UNRELEASED_TASKS:
+                // When the member is in the UNRELEASED_TASKS, we reconcile the
+                // member towards the latest target assignment. This will 
assign any
+                // of the unreleased tasks when they become available.
+                return computeNextAssignment(
+                    member.memberEpoch(),
+                    member.assignedTasks()
+                );
+
+            case UNKNOWN:
+                // We could only end up in this state if a new state is added 
in the
+                // future and the group coordinator is downgraded. In this 
case, the
+                // best option is to fence the member to force it to rejoin 
the group
+                // without any tasks and to reconcile it again from scratch.
+                if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) {
+                    throw new FencedMemberEpochException(
+                        "The streams group member is in a unknown state. "
+                            + "The member must abandon all its tasks and 
rejoin.");
+                }
+
+                return computeNextAssignment(
+                    targetAssignmentEpoch,
+                    member.assignedTasks()
+                );
+        }
+
+        return member;
+    }
+
+    /**
+     * Takes the current currentAssignment and the targetAssignment, and 
generates three
+     * collections:
+     *
+     * - the resultAssignedTasks: the tasks that are assigned in both the 
current and target
+     * assignments.
+     * - the resultTasksPendingRevocation: the tasks that are assigned in the 
current
+     * assignment but not in the target assignment.
+     * - the resultTasksPendingAssignment: the tasks that are assigned in the 
target assignment but
+     * not in the current assignment, and can be assigned currently (i.e., 
they are not owned by
+     * another member, as defined by the `isUnreleasedTask` predicate).
+     */
+    private boolean computeAssignmentDifference(Map<String, Set<Integer>> 
currentAssignment,
+                                                Map<String, Set<Integer>> 
targetAssignment,
+                                                Map<String, Set<Integer>> 
resultAssignedTasks,
+                                                Map<String, Set<Integer>> 
resultTasksPendingRevocation,
+                                                Map<String, Set<Integer>> 
resultTasksPendingAssignment,
+                                                BiPredicate<String, Integer> 
isUnreleasedTask) {
+        boolean hasUnreleasedTasks = false;
+
+        Set<String> allSubtopologyIds = new 
HashSet<>(targetAssignment.keySet());
+        allSubtopologyIds.addAll(currentAssignment.keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            hasUnreleasedTasks |= computeAssignmentDifferenceForOneSubtopology(
+                subtopologyId,
+                currentAssignment.getOrDefault(subtopologyId, 
Collections.emptySet()),
+                targetAssignment.getOrDefault(subtopologyId, 
Collections.emptySet()),
+                resultAssignedTasks,
+                resultTasksPendingRevocation,
+                resultTasksPendingAssignment,
+                isUnreleasedTask
+            );
+        }
+        return hasUnreleasedTasks;
+    }
+
+    private static boolean computeAssignmentDifferenceForOneSubtopology(final 
String subtopologyId,
+                                                                        final 
Set<Integer> currentTasksForThisSubtopology,
+                                                                        final 
Set<Integer> targetTasksForThisSubtopology,
+                                                                        final 
Map<String, Set<Integer>> resultAssignedTasks,
+                                                                        final 
Map<String, Set<Integer>> resultTasksPendingRevocation,
+                                                                        final 
Map<String, Set<Integer>> resultTasksPendingAssignment,
+                                                                        final 
BiPredicate<String, Integer> isUnreleasedTask) {
+        // Result Assigned Tasks = Current Tasks ∩ Target Tasks
+        // i.e. we remove all tasks from the current assignment that are not 
in the target
+        //         assignment
+        Set<Integer> resultAssignedTasksForThisSubtopology = new 
HashSet<>(currentTasksForThisSubtopology);
+        
resultAssignedTasksForThisSubtopology.retainAll(targetTasksForThisSubtopology);
+
+        // Result Tasks Pending Revocation = Current Tasks - Result Assigned 
Tasks
+        // i.e. we will ask the member to revoke all tasks in its current 
assignment that
+        //      are not in the target assignment
+        Set<Integer> resultTasksPendingRevocationForThisSubtopology = new 
HashSet<>(currentTasksForThisSubtopology);
+        
resultTasksPendingRevocationForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
+
+        // Result Tasks Pending Assignment = Target Tasks - Result Assigned 
Tasks - Unreleased Tasks
+        // i.e. we will ask the member to assign all tasks in its target 
assignment,
+        //      except those that are already assigned, and those that are 
unreleased
+        Set<Integer> resultTasksPendingAssignmentForThisSubtopology = new 
HashSet<>(targetTasksForThisSubtopology);
+        
resultTasksPendingAssignmentForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
+        boolean hasUnreleasedTasks = 
resultTasksPendingAssignmentForThisSubtopology.removeIf(taskId ->
+            isUnreleasedTask.test(subtopologyId, taskId)
+        );
+
+        if (!resultAssignedTasksForThisSubtopology.isEmpty()) {
+            resultAssignedTasks.put(subtopologyId, 
resultAssignedTasksForThisSubtopology);
+        }
+
+        if (!resultTasksPendingRevocationForThisSubtopology.isEmpty()) {
+            resultTasksPendingRevocation.put(subtopologyId, 
resultTasksPendingRevocationForThisSubtopology);
+        }
+
+        if (!resultTasksPendingAssignmentForThisSubtopology.isEmpty()) {
+            resultTasksPendingAssignment.put(subtopologyId, 
resultTasksPendingAssignmentForThisSubtopology);
+        }
+
+        return hasUnreleasedTasks;
+    }
+
+    /**
+     * Computes the next assignment.
+     *
+     * @param memberEpoch         The epoch of the member to use. This may be 
different from
+     *                            the epoch in {@link 
CurrentAssignmentBuilder#member}.
+     * @param memberAssignedTasks The assigned tasks of the member to use.
+     * @return A new StreamsGroupMember.
+     */
+    private StreamsGroupMember computeNextAssignment(int memberEpoch,
+                                                     TasksTuple 
memberAssignedTasks) {
+        Map<String, Set<Integer>> newActiveAssignedTasks = new HashMap<>();
+        Map<String, Set<Integer>> newActiveTasksPendingRevocation = new 
HashMap<>();
+        Map<String, Set<Integer>> newActiveTasksPendingAssignment = new 
HashMap<>();
+        Map<String, Set<Integer>> newStandbyAssignedTasks = new HashMap<>();
+        Map<String, Set<Integer>> newStandbyTasksPendingRevocation = new 
HashMap<>();
+        Map<String, Set<Integer>> newStandbyTasksPendingAssignment = new 
HashMap<>();
+        Map<String, Set<Integer>> newWarmupAssignedTasks = new HashMap<>();
+        Map<String, Set<Integer>> newWarmupTasksPendingRevocation = new 
HashMap<>();
+        Map<String, Set<Integer>> newWarmupTasksPendingAssignment = new 
HashMap<>();
+
+        boolean hasUnreleasedActiveTasks = computeAssignmentDifference(
+            memberAssignedTasks.activeTasks(),
+            targetAssignment.activeTasks(),
+            newActiveAssignedTasks,
+            newActiveTasksPendingRevocation,
+            newActiveTasksPendingAssignment,
+            (subtopologyId, partitionId) ->
+                currentActiveTaskProcessId.apply(subtopologyId, partitionId) 
!= null ||
+                    currentStandbyTaskProcessIds.apply(subtopologyId, 
partitionId)
+                        .contains(member.processId()) ||
+                    currentWarmupTaskProcessIds.apply(subtopologyId, 
partitionId)
+                        .contains(member.processId())
+        );
+
+        boolean hasUnreleasedStandbyTasks = computeAssignmentDifference(
+            memberAssignedTasks.standbyTasks(),
+            targetAssignment.standbyTasks(),
+            newStandbyAssignedTasks,
+            newStandbyTasksPendingRevocation,
+            newStandbyTasksPendingAssignment,
+            (subtopologyId, partitionId) ->
+                Objects.equals(currentActiveTaskProcessId.apply(subtopologyId, 
partitionId),
+                    member.processId()) ||
+                    currentStandbyTaskProcessIds.apply(subtopologyId, 
partitionId)
+                        .contains(member.processId()) ||
+                    currentWarmupTaskProcessIds.apply(subtopologyId, 
partitionId)
+                        .contains(member.processId())
+        );
+
+        boolean hasUnreleasedWarmupTasks = computeAssignmentDifference(
+            memberAssignedTasks.warmupTasks(),
+            targetAssignment.warmupTasks(),
+            newWarmupAssignedTasks,
+            newWarmupTasksPendingRevocation,
+            newWarmupTasksPendingAssignment,
+            (subtopologyId, partitionId) ->
+                Objects.equals(currentActiveTaskProcessId.apply(subtopologyId, 
partitionId),
+                    member.processId()) ||
+                    currentStandbyTaskProcessIds.apply(subtopologyId, 
partitionId)
+                        .contains(member.processId()) ||
+                    currentWarmupTaskProcessIds.apply(subtopologyId, 
partitionId)
+                        .contains(member.processId())
+        );
+
+        return buildNewMember(
+            memberEpoch,
+            new TasksTuple(
+                newActiveTasksPendingRevocation,
+                newStandbyTasksPendingRevocation,
+                newWarmupTasksPendingRevocation
+            ),
+            new TasksTuple(
+                newActiveAssignedTasks,
+                newStandbyAssignedTasks,
+                newWarmupAssignedTasks
+            ),
+            new TasksTuple(
+                newActiveTasksPendingAssignment,
+                newStandbyTasksPendingAssignment,
+                newWarmupTasksPendingAssignment
+            ),
+            hasUnreleasedActiveTasks || hasUnreleasedStandbyTasks || 
hasUnreleasedWarmupTasks
+        );
+    }
+
+    private StreamsGroupMember buildNewMember(final int memberEpoch,
+                                              final TasksTuple 
newTasksPendingRevocation,
+                                              final TasksTuple 
newAssignedTasks,
+                                              final TasksTuple 
newTasksPendingAssignment,
+                                              final boolean 
hasUnreleasedTasks) {
+
+        final boolean hasTasksToBeRevoked =
+            (!newTasksPendingRevocation.isEmpty())
+                && (ownedTasks.isEmpty() || 
ownedTasks.get().containsAny(newTasksPendingRevocation));
+
+        if (hasTasksToBeRevoked) {
+            // If there are tasks to be revoked, the member remains in its 
current
+            // epoch and requests the revocation of those tasks. It 
transitions to
+            // the UNREVOKED_TASKS state to wait until the client acknowledges 
the
+            // revocation of the tasks.
+            return new StreamsGroupMember.Builder(member)
+                .setState(MemberState.UNREVOKED_TASKS)
+                .updateMemberEpoch(memberEpoch)
+                .setAssignedTasks(newAssignedTasks)
+                .setTasksPendingRevocation(newTasksPendingRevocation)
+                .build();
+        } else if (!newTasksPendingAssignment.isEmpty()) {
+            // If there are tasks to be assigned, the member transitions to the
+            // target epoch and requests the assignment of those tasks. Note 
that
+            // the tasks are directly added to the assigned tasks set. The
+            // member transitions to the STABLE state or to the 
UNRELEASED_TASKS
+            // state depending on whether there are unreleased tasks or not.
+            MemberState newState =
+                hasUnreleasedTasks
+                    ? MemberState.UNRELEASED_TASKS
+                    : MemberState.STABLE;
+            return new StreamsGroupMember.Builder(member)
+                .setState(newState)
+                .updateMemberEpoch(targetAssignmentEpoch)
+                
.setAssignedTasks(newAssignedTasks.merge(newTasksPendingAssignment))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build();
+        } else if (hasUnreleasedTasks) {
+            // If there are no tasks to be revoked nor to be assigned but some
+            // tasks are not available yet, the member transitions to the 
target
+            // epoch, to the UNRELEASED_TASKS state and waits.
+            return new StreamsGroupMember.Builder(member)
+                .setState(MemberState.UNRELEASED_TASKS)
+                .updateMemberEpoch(targetAssignmentEpoch)
+                .setAssignedTasks(newAssignedTasks)
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build();
+        } else {
+            // Otherwise, the member transitions to the target epoch and to the
+            // STABLE state.
+            return new StreamsGroupMember.Builder(member)
+                .setState(MemberState.STABLE)
+                .updateMemberEpoch(targetAssignmentEpoch)
+                .setAssignedTasks(newAssignedTasks)
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build();
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index e23df3f5701..612e72fabdd 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -49,18 +49,8 @@ import java.util.stream.Collectors;
  * @param userEndpoint                  The user endpoint exposed for 
Interactive Queries by the Streams client that
  *                                      contains the member.
  * @param clientTags                    Tags of the client of the member used 
for rack-aware assignment.
- * @param assignedActiveTasks           Active tasks assigned to the member.
- *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
- * @param assignedStandbyTasks          Standby tasks assigned to the member.
- *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
- * @param assignedWarmupTasks           Warm-up tasks assigned to the member.
- *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
- * @param activeTasksPendingRevocation  Active tasks assigned to the member 
pending revocation.
- *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
- * @param standbyTasksPendingRevocation Standby tasks assigned to the member 
pending revocation.
- *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
- * @param warmupTasksPendingRevocation  Warm-up tasks assigned to the member 
pending revocation.
- *                                      The key of the map is the subtopology 
ID and the value is the set of partition IDs.
+ * @param assignedTasks                 Tasks assigned to the member.
+ * @param tasksPendingRevocation        Tasks owned by the member pending 
revocation.
  */
 @SuppressWarnings("checkstyle:JavaNCSS")
 public record StreamsGroupMember(String memberId,
@@ -76,22 +66,12 @@ public record StreamsGroupMember(String memberId,
                                  String processId,
                                  
Optional<StreamsGroupMemberMetadataValue.Endpoint> userEndpoint,
                                  Map<String, String> clientTags,
-                                 Map<String, Set<Integer>> assignedActiveTasks,
-                                 Map<String, Set<Integer>> 
assignedStandbyTasks,
-                                 Map<String, Set<Integer>> assignedWarmupTasks,
-                                 Map<String, Set<Integer>> 
activeTasksPendingRevocation,
-                                 Map<String, Set<Integer>> 
standbyTasksPendingRevocation,
-                                 Map<String, Set<Integer>> 
warmupTasksPendingRevocation) {
+                                 TasksTuple assignedTasks,
+                                 TasksTuple tasksPendingRevocation) {
 
     public StreamsGroupMember {
         Objects.requireNonNull(memberId, "memberId cannot be null");
         clientTags = clientTags != null ? 
Collections.unmodifiableMap(clientTags) : null;
-        assignedActiveTasks = assignedActiveTasks != null ? 
Collections.unmodifiableMap(assignedActiveTasks) : null;
-        assignedStandbyTasks = assignedStandbyTasks != null ? 
Collections.unmodifiableMap(assignedStandbyTasks) : null;
-        assignedWarmupTasks = assignedWarmupTasks != null ? 
Collections.unmodifiableMap(assignedWarmupTasks) : null;
-        activeTasksPendingRevocation = activeTasksPendingRevocation != null ? 
Collections.unmodifiableMap(activeTasksPendingRevocation) : null;
-        standbyTasksPendingRevocation = standbyTasksPendingRevocation != null 
? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null;
-        warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ? 
Collections.unmodifiableMap(warmupTasksPendingRevocation) : null;
     }
 
     /**
@@ -114,12 +94,8 @@ public record StreamsGroupMember(String memberId,
         private String processId = null;
         private Optional<StreamsGroupMemberMetadataValue.Endpoint> 
userEndpoint = null;
         private Map<String, String> clientTags = null;
-        private Map<String, Set<Integer>> assignedActiveTasks = null;
-        private Map<String, Set<Integer>> assignedStandbyTasks = null;
-        private Map<String, Set<Integer>> assignedWarmupTasks = null;
-        private Map<String, Set<Integer>> activeTasksPendingRevocation = null;
-        private Map<String, Set<Integer>> standbyTasksPendingRevocation = null;
-        private Map<String, Set<Integer>> warmupTasksPendingRevocation = null;
+        private TasksTuple assignedTasks = null;
+        private TasksTuple tasksPendingRevocation = null;
 
         public Builder(String memberId) {
             this.memberId = Objects.requireNonNull(memberId, "memberId cannot 
be null");
@@ -141,12 +117,8 @@ public record StreamsGroupMember(String memberId,
             this.userEndpoint = member.userEndpoint;
             this.clientTags = member.clientTags;
             this.state = member.state;
-            this.assignedActiveTasks = member.assignedActiveTasks;
-            this.assignedStandbyTasks = member.assignedStandbyTasks;
-            this.assignedWarmupTasks = member.assignedWarmupTasks;
-            this.activeTasksPendingRevocation = 
member.activeTasksPendingRevocation;
-            this.standbyTasksPendingRevocation = 
member.standbyTasksPendingRevocation;
-            this.warmupTasksPendingRevocation = 
member.warmupTasksPendingRevocation;
+            this.assignedTasks = member.assignedTasks;
+            this.tasksPendingRevocation = member.tasksPendingRevocation;
         }
 
         public Builder updateMemberEpoch(int memberEpoch) {
@@ -251,50 +223,13 @@ public record StreamsGroupMember(String memberId,
             return this;
         }
 
-        public Builder setAssignment(Assignment assignment) {
-            this.assignedActiveTasks = assignment.activeTasks();
-            this.assignedStandbyTasks = assignment.standbyTasks();
-            this.assignedWarmupTasks = assignment.warmupTasks();
+        public Builder setAssignedTasks(TasksTuple assignedTasks) {
+            this.assignedTasks = assignedTasks;
             return this;
         }
 
-        public Builder setAssignedActiveTasks(Map<String, Set<Integer>> 
assignedActiveTasks) {
-            this.assignedActiveTasks = assignedActiveTasks;
-            return this;
-        }
-
-        public Builder setAssignedStandbyTasks(Map<String, Set<Integer>> 
assignedStandbyTasks) {
-            this.assignedStandbyTasks = assignedStandbyTasks;
-            return this;
-        }
-
-        public Builder setAssignedWarmupTasks(Map<String, Set<Integer>> 
assignedWarmupTasks) {
-            this.assignedWarmupTasks = assignedWarmupTasks;
-            return this;
-        }
-
-        public Builder setAssignmentPendingRevocation(Assignment assignment) {
-            this.activeTasksPendingRevocation = assignment.activeTasks();
-            this.standbyTasksPendingRevocation = assignment.standbyTasks();
-            this.warmupTasksPendingRevocation = assignment.warmupTasks();
-            return this;
-        }
-
-        public Builder setActiveTasksPendingRevocation(
-            Map<String, Set<Integer>> activeTasksPendingRevocation) {
-            this.activeTasksPendingRevocation = activeTasksPendingRevocation;
-            return this;
-        }
-
-        public Builder setStandbyTasksPendingRevocation(
-            Map<String, Set<Integer>> standbyTasksPendingRevocation) {
-            this.standbyTasksPendingRevocation = standbyTasksPendingRevocation;
-            return this;
-        }
-
-        public Builder setWarmupTasksPendingRevocation(
-            Map<String, Set<Integer>> warmupTasksPendingRevocation) {
-            this.warmupTasksPendingRevocation = warmupTasksPendingRevocation;
+        public Builder setTasksPendingRevocation(TasksTuple 
tasksPendingRevocation) {
+            this.tasksPendingRevocation = tasksPendingRevocation;
             return this;
         }
 
@@ -318,15 +253,20 @@ public record StreamsGroupMember(String memberId,
             setMemberEpoch(record.memberEpoch());
             setPreviousMemberEpoch(record.previousMemberEpoch());
             setState(MemberState.fromValue(record.state()));
-            
setAssignedActiveTasks(assignmentFromTaskIds(record.activeTasks()));
-            
setAssignedStandbyTasks(assignmentFromTaskIds(record.standbyTasks()));
-            
setAssignedWarmupTasks(assignmentFromTaskIds(record.warmupTasks()));
-            setActiveTasksPendingRevocation(
-                assignmentFromTaskIds(record.activeTasksPendingRevocation()));
-            setStandbyTasksPendingRevocation(
-                assignmentFromTaskIds(record.standbyTasksPendingRevocation()));
-            setWarmupTasksPendingRevocation(
-                assignmentFromTaskIds(record.warmupTasksPendingRevocation()));
+            setAssignedTasks(
+                new TasksTuple(
+                    assignmentFromTaskIds(record.activeTasks()),
+                    assignmentFromTaskIds(record.standbyTasks()),
+                    assignmentFromTaskIds(record.warmupTasks())
+                )
+            );
+            setTasksPendingRevocation(
+                new TasksTuple(
+                    
assignmentFromTaskIds(record.activeTasksPendingRevocation()),
+                    
assignmentFromTaskIds(record.standbyTasksPendingRevocation()),
+                    
assignmentFromTaskIds(record.warmupTasksPendingRevocation())
+                )
+            );
             return this;
         }
 
@@ -353,12 +293,8 @@ public record StreamsGroupMember(String memberId,
                 processId,
                 userEndpoint,
                 clientTags,
-                assignedActiveTasks,
-                assignedStandbyTasks,
-                assignedWarmupTasks,
-                activeTasksPendingRevocation,
-                standbyTasksPendingRevocation,
-                warmupTasksPendingRevocation
+                assignedTasks,
+                tasksPendingRevocation
             );
         }
     }
@@ -377,9 +313,7 @@ public record StreamsGroupMember(String memberId,
      *
      * @return The StreamsGroupMember mapped as 
StreamsGroupDescribeResponseData.Member.
      */
-    public StreamsGroupDescribeResponseData.Member 
asStreamsGroupDescribeMember(
-        Assignment targetAssignment
-    ) {
+    public StreamsGroupDescribeResponseData.Member 
asStreamsGroupDescribeMember(TasksTuple targetAssignment) {
         final StreamsGroupDescribeResponseData.Assignment 
describedTargetAssignment =
             new StreamsGroupDescribeResponseData.Assignment();
 
@@ -395,9 +329,9 @@ public record StreamsGroupMember(String memberId,
             .setMemberId(memberId)
             .setAssignment(
                 new StreamsGroupDescribeResponseData.Assignment()
-                    .setActiveTasks(taskIdsFromMap(assignedActiveTasks))
-                    .setStandbyTasks(taskIdsFromMap(assignedStandbyTasks))
-                    .setWarmupTasks(taskIdsFromMap(assignedWarmupTasks)))
+                    
.setActiveTasks(taskIdsFromMap(assignedTasks.activeTasks()))
+                    
.setStandbyTasks(taskIdsFromMap(assignedTasks.standbyTasks()))
+                    
.setWarmupTasks(taskIdsFromMap(assignedTasks.warmupTasks())))
             .setTargetAssignment(describedTargetAssignment)
             .setClientHost(clientHost)
             .setClientId(clientId)
@@ -419,9 +353,7 @@ public record StreamsGroupMember(String memberId,
             );
     }
 
-    private static List<StreamsGroupDescribeResponseData.TaskIds> 
taskIdsFromMap(
-        Map<String, Set<Integer>> tasks
-    ) {
+    private static List<StreamsGroupDescribeResponseData.TaskIds> 
taskIdsFromMap(Map<String, Set<Integer>> tasks) {
         List<StreamsGroupDescribeResponseData.TaskIds> taskIds = new 
ArrayList<>();
         tasks.forEach((subtopologyId, partitionSet) -> {
             taskIds.add(new StreamsGroupDescribeResponseData.TaskIds()
@@ -432,32 +364,9 @@ public record StreamsGroupMember(String memberId,
     }
 
     /**
-     * @return True if the two provided members have different assigned active 
tasks.
-     */
-    public static boolean hasAssignedActiveTasksChanged(
-        StreamsGroupMember member1,
-        StreamsGroupMember member2
-    ) {
-        return 
!member1.assignedActiveTasks().equals(member2.assignedActiveTasks());
-    }
-
-    /**
-     * @return True if the two provided members have different assigned active 
tasks.
-     */
-    public static boolean hasAssignedStandbyTasksChanged(
-        StreamsGroupMember member1,
-        StreamsGroupMember member2
-    ) {
-        return 
!member1.assignedStandbyTasks().equals(member2.assignedStandbyTasks());
-    }
-
-    /**
-     * @return True if the two provided members have different assigned active 
tasks.
+     * @return True if the two provided members have different assigned tasks.
      */
-    public static boolean hasAssignedWarmupTasksChanged(
-        StreamsGroupMember member1,
-        StreamsGroupMember member2
-    ) {
-        return 
!member1.assignedWarmupTasks().equals(member2.assignedWarmupTasks());
+    public static boolean hasAssignedTasksChanged(StreamsGroupMember member1, 
StreamsGroupMember member2) {
+        return !member1.assignedTasks().equals(member2.assignedTasks());
     }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
similarity index 50%
rename from 
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
rename to 
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
index da377d19ccd..ea4c6f81a52 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/Assignment.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TasksTuple.java
@@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.group.streams;
 import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
@@ -26,45 +27,89 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * An immutable assignment for a member.
+ * An immutable tuple containing active, standby and warm-up tasks.
  *
- * @param activeTasks           Active tasks assigned to the member.
+ * @param activeTasks           Active tasks.
  *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
- * @param standbyTasks          Standby tasks assigned to the member.
+ * @param standbyTasks          Standby tasks.
  *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
- * @param warmupTasks           Warm-up tasks assigned to the member.
+ * @param warmupTasks           Warm-up tasks.
  *                              The key of the map is the subtopology ID and 
the value is the set of partition IDs.
  */
-public record Assignment(Map<String, Set<Integer>> activeTasks,
+public record TasksTuple(Map<String, Set<Integer>> activeTasks,
                          Map<String, Set<Integer>> standbyTasks,
                          Map<String, Set<Integer>> warmupTasks) {
 
-    public Assignment {
+    public TasksTuple {
         activeTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(activeTasks));
         standbyTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks));
         warmupTasks = 
Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks));
     }
 
     /**
-     * An empty assignment.
+     * An empty task tuple.
      */
-    public static final Assignment EMPTY = new Assignment(
+    public static final TasksTuple EMPTY = new TasksTuple(
         Collections.emptyMap(),
         Collections.emptyMap(),
         Collections.emptyMap()
     );
 
     /**
-     * Creates a {{@link 
org.apache.kafka.coordinator.group.streams.Assignment}} from a
+     * @return true if all collections in the tuple are empty.
+     */
+    public boolean isEmpty() {
+        return activeTasks.isEmpty() && standbyTasks.isEmpty() && 
warmupTasks.isEmpty();
+    }
+
+    /**
+     * Merges this task tuple with another task tuple.
+     *
+     * @param other The other task tuple.
+     * @return A new task tuple, containing all active tasks, standby tasks 
and warm-up tasks from both tuples.
+     */
+    public TasksTuple merge(TasksTuple other) {
+        Map<String, Set<Integer>> mergedActiveTasks = merge(activeTasks, 
other.activeTasks);
+        Map<String, Set<Integer>> mergedStandbyTasks = merge(standbyTasks, 
other.standbyTasks);
+        Map<String, Set<Integer>> mergedWarmupTasks = merge(warmupTasks, 
other.warmupTasks);
+        return new TasksTuple(mergedActiveTasks, mergedStandbyTasks, 
mergedWarmupTasks);
+    }
+
+    private static Map<String, Set<Integer>> merge(final Map<String, 
Set<Integer>> tasks1, final Map<String, Set<Integer>> tasks2) {
+        HashMap<String, Set<Integer>> result = new HashMap<>();
+        tasks1.forEach((subtopologyId, tasks) ->
+            result.put(subtopologyId, new HashSet<>(tasks)));
+        tasks2.forEach((subtopologyId, tasks) -> result
+            .computeIfAbsent(subtopologyId, __ -> new HashSet<>())
+            .addAll(tasks));
+        return result;
+    }
+
+    /**
+     * Checks if this task tuple contains any of the tasks in another task 
tuple.
+     *
+     * @param other The other task tuple.
+     * @return true if there is at least one active, standby or warm-up task 
that is present in both tuples.
+     */
+    public boolean containsAny(TasksTuple other) {
+        return activeTasks.entrySet().stream().anyMatch(
+            entry -> other.activeTasks.containsKey(entry.getKey()) && 
!Collections.disjoint(entry.getValue(), other.activeTasks.get(entry.getKey()))
+        ) || standbyTasks.entrySet().stream().anyMatch(
+            entry -> other.standbyTasks.containsKey(entry.getKey()) && 
!Collections.disjoint(entry.getValue(), other.standbyTasks.get(entry.getKey()))
+        ) || warmupTasks.entrySet().stream().anyMatch(
+            entry -> other.warmupTasks.containsKey(entry.getKey()) && 
!Collections.disjoint(entry.getValue(), other.warmupTasks.get(entry.getKey()))
+        );
+    }
+
+    /**
+     * Creates a {{@link TasksTuple}} from a
      * {{@link 
org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue}}.
      *
      * @param record The record.
-     * @return A {{@link 
org.apache.kafka.coordinator.group.streams.Assignment}}.
+     * @return A {{@link TasksTuple}}.
      */
-    public static Assignment fromRecord(
-        StreamsGroupTargetAssignmentMemberValue record
-    ) {
-        return new Assignment(
+    public static TasksTuple 
fromTargetAssignmentRecord(StreamsGroupTargetAssignmentMemberValue record) {
+        return new TasksTuple(
             record.activeTasks().stream()
                 .collect(Collectors.toMap(
                         
StreamsGroupTargetAssignmentMemberValue.TaskIds::subtopologyId,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
index a97cdc33b79..c4ac5803b35 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/GroupAssignment.java
@@ -22,7 +22,7 @@ import java.util.Objects;
 /**
  * The task assignment for a streams group.
  *
- * @param members The member assignments keyed by member id.
+ * @param members The member assignments keyed by member ID.
  */
 public record GroupAssignment(Map<String, MemberAssignment> members) {
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
new file mode 100644
index 00000000000..4d55a8419fb
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
@@ -0,0 +1,825 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CurrentAssignmentBuilderTest {
+
+    private static final String SUBTOPOLOGY_ID1 = Uuid.randomUuid().toString();
+    private static final String SUBTOPOLOGY_ID2 = Uuid.randomUuid().toString();
+    private static final String PROCESS_ID = "process_id";
+    private static final String MEMBER_NAME = "member";
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToStable(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member =
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(
+                    mkTasksTuple(
+                        taskRole,
+                        mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                        mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(
+                    taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                    mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToStableAtTargetEpoch(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member =
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(
+                    mkTasksTuple(
+                        taskRole,
+                        mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                        mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(
+                    taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                    mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToStableWithNewTasks(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.STABLE)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
+                    mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToUnrevokedTasks(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.STABLE)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 4, 5)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.UNREVOKED_TASKS)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2),
+                    mkTasks(SUBTOPOLOGY_ID2, 4)))
+                .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 1),
+                    mkTasks(SUBTOPOLOGY_ID2, 3)))
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToUnrevokedWithEmptyAssignment(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member =
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(
+                    mkTasksTuple(
+                        taskRole,
+                        mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                        mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, TasksTuple.EMPTY)
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.UNREVOKED_TASKS)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(TasksTuple.EMPTY)
+                .setTasksPendingRevocation(
+                    mkTasksTuple(
+                        taskRole,
+                        mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                        mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testStableToUnreleasedTasks(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.STABLE)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.UNRELEASED_TASKS)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                    mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void 
testStableToUnreleasedTasksWithOwnedTasksNotHavingRevokedTasks(TaskRole 
taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.STABLE)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 3, 5)))
+            .withCurrentActiveTaskProcessId((subtopologyId, __) ->
+                SUBTOPOLOGY_ID2.equals(subtopologyId) ? PROCESS_ID : null
+            )
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .withOwnedAssignment(mkTasksTuple(taskRole))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.UNRELEASED_TASKS)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 1, 2),
+                    mkTasks(SUBTOPOLOGY_ID2, 3)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnrevokedTasksToStable(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1),
+                mkTasks(SUBTOPOLOGY_ID2, 4)))
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .withOwnedAssignment(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testRemainsInUnrevokedTasks(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1),
+                mkTasks(SUBTOPOLOGY_ID2, 4)))
+            .build();
+
+        CurrentAssignmentBuilder currentAssignmentBuilder = new 
CurrentAssignmentBuilder(
+            member)
+            .withTargetAssignment(memberEpoch + 2, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet());
+
+        assertEquals(
+            member,
+            currentAssignmentBuilder
+                .withOwnedAssignment(null)
+                .build()
+        );
+
+        assertEquals(
+            member,
+            currentAssignmentBuilder
+                .withOwnedAssignment(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 1, 2, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+                .build()
+        );
+
+        assertEquals(
+            member,
+            currentAssignmentBuilder
+                .withOwnedAssignment(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 4, 5, 6)))
+                .build()
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnrevokedTasksToUnrevokedTasks(TaskRole taskRole) {
+        final int memberEpoch = 10;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1),
+                mkTasks(SUBTOPOLOGY_ID2, 4)))
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 2, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withOwnedAssignment(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.UNREVOKED_TASKS)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 6)))
+                .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2),
+                    mkTasks(SUBTOPOLOGY_ID2, 5)))
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnrevokedTasksToUnreleasedTasks(TaskRole taskRole) {
+        final int memberEpoch = 11;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNREVOKED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch - 1)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 1),
+                mkTasks(SUBTOPOLOGY_ID2, 4)))
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .withOwnedAssignment(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6))
+            )
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.UNRELEASED_TASKS)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnreleasedTasksToStable(TaskRole taskRole) {
+        final int memberEpoch = 11;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setProcessId("process1")
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> 
Collections.singleton(PROCESS_ID))
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) ->
+                Collections.singleton(PROCESS_ID))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId("process1")
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnreleasedTasksToStableWithNewTasks(TaskRole taskRole) {
+        int memberEpoch = 11;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setProcessId("process1")
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId("process1")
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+                    mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnreleasedTasksToUnreleasedTasks(TaskRole taskRole) {
+        int memberEpoch = 11;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> 
Collections.singleton(PROCESS_ID))
+            .withCurrentWarmupTaskProcessIds(
+                (subtopologyId, partitionId) -> 
Collections.singleton(PROCESS_ID))
+            .build();
+
+        assertEquals(member, updatedMember);
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void 
testUnreleasedTasksToUnreleasedTasksOtherUnreleasedTaskRole(TaskRole taskRole) {
+        int memberEpoch = 11;
+
+        // The unreleased task is owned by a task of a different role on the 
same process.
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> (taskRole == TaskRole.STANDBY)
+                    ? Collections.emptySet() : 
Collections.singleton(PROCESS_ID))
+            .withCurrentWarmupTaskProcessIds(
+                (subtopologyId, partitionId) -> (taskRole == TaskRole.STANDBY)
+                    ? Collections.singleton(PROCESS_ID) : 
Collections.emptySet())
+            .build();
+
+        assertEquals(member, updatedMember);
+    }
+
+    @Test
+    public void testUnreleasedTasksToUnreleasedTasksAnyActiveOwner() {
+        int memberEpoch = 11;
+
+        // The unreleased task remains unreleased, because it is owned by any 
other instance in
+        // an active role, no matter the process.
+        // The task that is not unreleased can be assigned.
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(TaskRole.ACTIVE,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .build();
+
+        StreamsGroupMember expectedMember = new 
StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setProcessId(PROCESS_ID)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(TaskRole.ACTIVE,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch, mkTasksTuple(TaskRole.ACTIVE,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6, 7)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) ->
+                (subtopologyId.equals(SUBTOPOLOGY_ID1) && partitionId == 4) ? 
"anyOtherProcess"
+                    : null)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .build();
+
+        assertEquals(expectedMember, updatedMember);
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnreleasedTasksToUnrevokedTasks(TaskRole taskRole) {
+        int memberEpoch = 11;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNRELEASED_TASKS)
+            .setProcessId("process1")
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 5, 6)))
+            .setTasksPendingRevocation(mkTasksTuple(TaskRole.ACTIVE,
+                mkTasks(SUBTOPOLOGY_ID1, 4),
+                mkTasks(SUBTOPOLOGY_ID2, 7)))
+            .build();
+
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.UNREVOKED_TASKS)
+                .setProcessId("process1")
+                .setMemberEpoch(memberEpoch)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 6)))
+                .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 2),
+                    mkTasks(SUBTOPOLOGY_ID2, 5)))
+                .build(),
+            updatedMember
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUnknownState(TaskRole taskRole) {
+        int memberEpoch = 11;
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
+            .setState(MemberState.UNKNOWN)
+            .setMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch)
+            .setProcessId(PROCESS_ID)
+            .setAssignedTasks(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 6)))
+            .setTasksPendingRevocation(mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 2),
+                mkTasks(SUBTOPOLOGY_ID2, 5)))
+            .build();
+
+        // When the member is in an unknown state, the member is first to force
+        // a reset of the client side member state.
+        assertThrows(FencedMemberEpochException.class, () -> new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .build());
+
+        // Then the member rejoins with no owned tasks.
+        StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
+            .withTargetAssignment(memberEpoch + 1, mkTasksTuple(taskRole,
+                mkTasks(SUBTOPOLOGY_ID1, 3),
+                mkTasks(SUBTOPOLOGY_ID2, 6)))
+            .withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> 
PROCESS_ID)
+            .withCurrentStandbyTaskProcessIds(
+                (subtopologyId, partitionId) -> Collections.emptySet())
+            .withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> 
Collections.emptySet())
+            .withOwnedAssignment(mkTasksTuple(taskRole))
+            .build();
+
+        assertEquals(
+            new StreamsGroupMember.Builder(MEMBER_NAME)
+                .setState(MemberState.STABLE)
+                .setProcessId(PROCESS_ID)
+                .setMemberEpoch(memberEpoch + 1)
+                .setPreviousMemberEpoch(memberEpoch)
+                .setAssignedTasks(mkTasksTuple(taskRole,
+                    mkTasks(SUBTOPOLOGY_ID1, 3),
+                    mkTasks(SUBTOPOLOGY_ID2, 6)))
+                .setTasksPendingRevocation(TasksTuple.EMPTY)
+                .build(),
+            updatedMember
+        );
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 8c6d3d9088a..f6c33df6f13 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -25,14 +25,12 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -40,8 +38,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class StreamsGroupMemberTest {
 
@@ -70,12 +70,18 @@ public class StreamsGroupMemberTest {
     private static final List<Integer> TASKS4 = List.of(3, 2, 1);
     private static final List<Integer> TASKS5 = List.of(6, 5, 4);
     private static final List<Integer> TASKS6 = List.of(9, 7);
-    private static final Map<String, Set<Integer>> ASSIGNED_ACTIVE_TASKS = 
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS1.toArray(Integer[]::new)));
-    private static final Map<String, Set<Integer>> ASSIGNED_STANDBY_TASKS = 
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, TASKS2.toArray(Integer[]::new)));
-    private static final Map<String, Set<Integer>> ASSIGNED_WARMUP_TASKS = 
mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, TASKS3.toArray(Integer[]::new)));
-    private static final Map<String, Set<Integer>> 
ACTIVE_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, 
TASKS4.toArray(Integer[]::new)));
-    private static final Map<String, Set<Integer>> 
STANDBY_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, 
TASKS5.toArray(Integer[]::new)));
-    private static final Map<String, Set<Integer>> 
WARMUP_TASKS_PENDING_REVOCATION = mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, 
TASKS6.toArray(Integer[]::new)));
+    private static final TasksTuple ASSIGNED_TASKS =
+        new TasksTuple(
+            mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, 
TASKS1.toArray(Integer[]::new))),
+            mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, 
TASKS2.toArray(Integer[]::new))),
+            mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, 
TASKS3.toArray(Integer[]::new)))
+        );
+    private static final TasksTuple TASKS_PENDING_REVOCATION =
+        new TasksTuple(
+            mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, 
TASKS4.toArray(Integer[]::new))),
+            mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY1, 
TASKS5.toArray(Integer[]::new))),
+            mkTasksPerSubtopology(mkTasks(SUBTOPOLOGY2, 
TASKS6.toArray(Integer[]::new)))
+        );
 
     @Test
     public void testBuilderWithMemberIdIsNull() {
@@ -112,12 +118,8 @@ public class StreamsGroupMemberTest {
         assertNull(member.processId());
         assertNull(member.userEndpoint());
         assertNull(member.clientTags());
-        assertNull(member.assignedActiveTasks());
-        assertNull(member.assignedStandbyTasks());
-        assertNull(member.assignedWarmupTasks());
-        assertNull(member.activeTasksPendingRevocation());
-        assertNull(member.standbyTasksPendingRevocation());
-        assertNull(member.warmupTasksPendingRevocation());
+        assertNull(member.assignedTasks());
+        assertNull(member.tasksPendingRevocation());
     }
 
     @Test
@@ -136,12 +138,8 @@ public class StreamsGroupMemberTest {
         assertEquals(PROCESS_ID, member.processId());
         assertEquals(Optional.of(USER_ENDPOINT), member.userEndpoint());
         assertEquals(CLIENT_TAGS, member.clientTags());
-        assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
-        assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
-        assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
-        assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, 
member.activeTasksPendingRevocation());
-        assertEquals(STANDBY_TASKS_PENDING_REVOCATION, 
member.standbyTasksPendingRevocation());
-        assertEquals(WARMUP_TASKS_PENDING_REVOCATION, 
member.warmupTasksPendingRevocation());
+        assertEquals(ASSIGNED_TASKS, member.assignedTasks());
+        assertEquals(TASKS_PENDING_REVOCATION, 
member.tasksPendingRevocation());
     }
 
     @Test
@@ -179,12 +177,8 @@ public class StreamsGroupMemberTest {
         assertNull(member.memberEpoch());
         assertNull(member.previousMemberEpoch());
         assertNull(member.state());
-        assertNull(member.assignedActiveTasks());
-        assertNull(member.assignedStandbyTasks());
-        assertNull(member.assignedWarmupTasks());
-        assertNull(member.activeTasksPendingRevocation());
-        assertNull(member.standbyTasksPendingRevocation());
-        assertNull(member.warmupTasksPendingRevocation());
+        assertNull(member.assignedTasks());
+        assertNull(member.tasksPendingRevocation());
     }
 
     @Test
@@ -208,12 +202,8 @@ public class StreamsGroupMemberTest {
         assertEquals(record.memberEpoch(), member.memberEpoch());
         assertEquals(record.previousMemberEpoch(), 
member.previousMemberEpoch());
         assertEquals(MemberState.fromValue(record.state()), member.state());
-        assertEquals(ASSIGNED_ACTIVE_TASKS, member.assignedActiveTasks());
-        assertEquals(ASSIGNED_STANDBY_TASKS, member.assignedStandbyTasks());
-        assertEquals(ASSIGNED_WARMUP_TASKS, member.assignedWarmupTasks());
-        assertEquals(ACTIVE_TASKS_PENDING_REVOCATION, 
member.activeTasksPendingRevocation());
-        assertEquals(STANDBY_TASKS_PENDING_REVOCATION, 
member.standbyTasksPendingRevocation());
-        assertEquals(WARMUP_TASKS_PENDING_REVOCATION, 
member.warmupTasksPendingRevocation());
+        assertEquals(ASSIGNED_TASKS, member.assignedTasks());
+        assertEquals(TASKS_PENDING_REVOCATION, 
member.tasksPendingRevocation());
         assertNull(member.instanceId());
         assertNull(member.rackId());
         assertNull(member.rebalanceTimeoutMs());
@@ -275,12 +265,8 @@ public class StreamsGroupMemberTest {
         assertEquals(member.state(), updatedMember.state());
         assertEquals(member.clientId(), updatedMember.clientId());
         assertEquals(member.clientHost(), updatedMember.clientHost());
-        assertEquals(member.assignedActiveTasks(), 
updatedMember.assignedActiveTasks());
-        assertEquals(member.assignedStandbyTasks(), 
updatedMember.assignedStandbyTasks());
-        assertEquals(member.assignedWarmupTasks(), 
updatedMember.assignedWarmupTasks());
-        assertEquals(member.activeTasksPendingRevocation(), 
updatedMember.activeTasksPendingRevocation());
-        assertEquals(member.standbyTasksPendingRevocation(), 
updatedMember.standbyTasksPendingRevocation());
-        assertEquals(member.warmupTasksPendingRevocation(), 
updatedMember.warmupTasksPendingRevocation());
+        assertEquals(member.assignedTasks(), updatedMember.assignedTasks());
+        assertEquals(member.tasksPendingRevocation(), 
updatedMember.tasksPendingRevocation());
     }
 
     @Test
@@ -306,25 +292,8 @@ public class StreamsGroupMemberTest {
         assertEquals(member.processId(), updatedMember.processId());
         assertEquals(member.userEndpoint(), updatedMember.userEndpoint());
         assertEquals(member.clientTags(), updatedMember.clientTags());
-        assertEquals(member.assignedActiveTasks(), 
updatedMember.assignedActiveTasks());
-        assertEquals(member.assignedStandbyTasks(), 
updatedMember.assignedStandbyTasks());
-        assertEquals(member.assignedWarmupTasks(), 
updatedMember.assignedWarmupTasks());
-        assertEquals(member.activeTasksPendingRevocation(), 
updatedMember.activeTasksPendingRevocation());
-        assertEquals(member.standbyTasksPendingRevocation(), 
updatedMember.standbyTasksPendingRevocation());
-        assertEquals(member.warmupTasksPendingRevocation(), 
updatedMember.warmupTasksPendingRevocation());
-    }
-
-    @Test
-    public void testReturnUnmodifiableFields() {
-        final StreamsGroupMember member = createStreamsGroupMember();
-
-        assertThrows(UnsupportedOperationException.class, () -> 
member.clientTags().put("not allowed", ""));
-        assertThrows(UnsupportedOperationException.class, () -> 
member.assignedActiveTasks().put("not allowed", Collections.emptySet()));
-        assertThrows(UnsupportedOperationException.class, () -> 
member.assignedStandbyTasks().put("not allowed", Collections.emptySet()));
-        assertThrows(UnsupportedOperationException.class, () -> 
member.assignedWarmupTasks().put("not allowed", Collections.emptySet()));
-        assertThrows(UnsupportedOperationException.class, () -> 
member.activeTasksPendingRevocation().put("not allowed", 
Collections.emptySet()));
-        assertThrows(UnsupportedOperationException.class, () -> 
member.standbyTasksPendingRevocation().put("not allowed", 
Collections.emptySet()));
-        assertThrows(UnsupportedOperationException.class, () -> 
member.warmupTasksPendingRevocation().put("not allowed", 
Collections.emptySet()));
+        assertEquals(member.assignedTasks(), updatedMember.assignedTasks());
+        assertEquals(member.tasksPendingRevocation(), 
updatedMember.tasksPendingRevocation());
     }
 
     @Test
@@ -333,7 +302,7 @@ public class StreamsGroupMemberTest {
         List<Integer> assignedTasks1 = Arrays.asList(10, 11, 12);
         List<Integer> assignedTasks2 = Arrays.asList(13, 14, 15);
         List<Integer> assignedTasks3 = Arrays.asList(16, 17, 18);
-        Assignment targetAssignment = new Assignment(
+        TasksTuple targetAssignment = new TasksTuple(
             mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(assignedTasks3))),
             mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(assignedTasks2))),
             mkMap(mkEntry(SUBTOPOLOGY3, new HashSet<>(assignedTasks1)))
@@ -404,6 +373,45 @@ public class StreamsGroupMemberTest {
         assertEquals(new StreamsGroupDescribeResponseData.Assignment(), 
streamsGroupDescribeMember.targetAssignment());
     }
 
+    @Test
+    public void testHasAssignedTasksChanged() {
+        StreamsGroupMember member1 = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setAssignedTasks(new TasksTuple(
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
+                mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
+            ))
+            .build();
+
+        StreamsGroupMember member2 = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setAssignedTasks(new TasksTuple(
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS4))),
+                mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS5))),
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS6)))
+            ))
+            .build();
+
+        assertTrue(StreamsGroupMember.hasAssignedTasksChanged(member1, 
member2));
+
+        StreamsGroupMember member3 = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setAssignedTasks(new TasksTuple(
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
+                mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
+            ))
+            .build();
+
+        StreamsGroupMember member4 = new StreamsGroupMember.Builder(MEMBER_ID)
+            .setAssignedTasks(new TasksTuple(
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS1))),
+                mkMap(mkEntry(SUBTOPOLOGY2, new HashSet<>(TASKS2))),
+                mkMap(mkEntry(SUBTOPOLOGY1, new HashSet<>(TASKS3)))
+            ))
+            .build();
+
+        assertFalse(StreamsGroupMember.hasAssignedTasksChanged(member3, 
member4));
+    }
+
     private StreamsGroupMember createStreamsGroupMember() {
         return new StreamsGroupMember.Builder(MEMBER_ID)
             .setMemberEpoch(MEMBER_EPOCH)
@@ -418,12 +426,8 @@ public class StreamsGroupMemberTest {
             .setProcessId(PROCESS_ID)
             .setUserEndpoint(USER_ENDPOINT)
             .setClientTags(CLIENT_TAGS)
-            .setAssignedActiveTasks(ASSIGNED_ACTIVE_TASKS)
-            .setAssignedStandbyTasks(ASSIGNED_STANDBY_TASKS)
-            .setAssignedWarmupTasks(ASSIGNED_WARMUP_TASKS)
-            .setActiveTasksPendingRevocation(ACTIVE_TASKS_PENDING_REVOCATION)
-            .setStandbyTasksPendingRevocation(STANDBY_TASKS_PENDING_REVOCATION)
-            .setWarmupTasksPendingRevocation(WARMUP_TASKS_PENDING_REVOCATION)
+            .setAssignedTasks(ASSIGNED_TASKS)
+            .setTasksPendingRevocation(TASKS_PENDING_REVOCATION)
             .build();
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
index 47668ec84c0..f633fec80f7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TaskAssignmentTestUtil.java
@@ -17,24 +17,27 @@
 package org.apache.kafka.coordinator.group.streams;
 
 import java.util.AbstractMap;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 
 public class TaskAssignmentTestUtil {
 
-    public static Assignment mkAssignment(final Map<String, Set<Integer>> 
activeTasks,
-                                          final Map<String, Set<Integer>> 
standbyTasks,
-                                          final Map<String, Set<Integer>> 
warmupTasks) {
-        return new Assignment(
-            Collections.unmodifiableMap(Objects.requireNonNull(activeTasks)),
-            Collections.unmodifiableMap(Objects.requireNonNull(standbyTasks)),
-            Collections.unmodifiableMap(Objects.requireNonNull(warmupTasks))
-        );
+    public enum TaskRole {
+        ACTIVE,
+        STANDBY,
+        WARMUP
+    }
+
+    @SafeVarargs
+    public static TasksTuple mkTasksTuple(TaskRole taskRole, Map.Entry<String, 
Set<Integer>>... entries) {
+        return switch (taskRole) {
+            case ACTIVE -> new TasksTuple(mkTasksPerSubtopology(entries), new 
HashMap<>(), new HashMap<>());
+            case STANDBY -> new TasksTuple(new HashMap<>(), 
mkTasksPerSubtopology(entries), new HashMap<>());
+            case WARMUP -> new TasksTuple(new HashMap<>(), new HashMap<>(), 
mkTasksPerSubtopology(entries));
+        };
     }
 
     public static Map.Entry<String, Set<Integer>> mkTasks(String subtopologyId,
@@ -46,8 +49,7 @@ public class TaskAssignmentTestUtil {
     }
 
     @SafeVarargs
-    public static Map<String, Set<Integer>> 
mkTasksPerSubtopology(Map.Entry<String,
-                                                                  
Set<Integer>>... entries) {
+    public static Map<String, Set<Integer>> 
mkTasksPerSubtopology(Map.Entry<String, Set<Integer>>... entries) {
         Map<String, Set<Integer>> assignment = new HashMap<>();
         for (Map.Entry<String, Set<Integer>> entry : entries) {
             assignment.put(entry.getKey(), entry.getValue());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleTest.java
similarity index 57%
rename from 
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
rename to 
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleTest.java
index 7c0baf27364..73c43a6d088 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/AssignmentTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TasksTupleTest.java
@@ -30,19 +30,21 @@ import java.util.Set;
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
 import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksPerSubtopology;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class AssignmentTest {
+public class TasksTupleTest {
 
-    static final String SUBTOPOLOGY_1 = "subtopology1";
-    static final String SUBTOPOLOGY_2 = "subtopology2";
-    static final String SUBTOPOLOGY_3 = "subtopology3";
+    private static final String SUBTOPOLOGY_1 = "subtopology1";
+    private static final String SUBTOPOLOGY_2 = "subtopology2";
+    private static final String SUBTOPOLOGY_3 = "subtopology3";
 
     @Test
     public void testTasksCannotBeNull() {
-        assertThrows(NullPointerException.class, () -> new Assignment(null, 
Collections.emptyMap(), Collections.emptyMap()));
-        assertThrows(NullPointerException.class, () -> new 
Assignment(Collections.emptyMap(), null, Collections.emptyMap()));
-        assertThrows(NullPointerException.class, () -> new 
Assignment(Collections.emptyMap(), Collections.emptyMap(), null));
+        assertThrows(NullPointerException.class, () -> new TasksTuple(null, 
Collections.emptyMap(), Collections.emptyMap()));
+        assertThrows(NullPointerException.class, () -> new 
TasksTuple(Collections.emptyMap(), null, Collections.emptyMap()));
+        assertThrows(NullPointerException.class, () -> new 
TasksTuple(Collections.emptyMap(), Collections.emptyMap(), null));
     }
 
     @Test
@@ -56,14 +58,14 @@ public class AssignmentTest {
         Map<String, Set<Integer>> warmupTasks = mkTasksPerSubtopology(
             mkTasks(SUBTOPOLOGY_3, 4, 5, 6)
         );
-        Assignment assignment = new Assignment(activeTasks, standbyTasks, 
warmupTasks);
-
-        assertEquals(activeTasks, assignment.activeTasks());
-        assertThrows(UnsupportedOperationException.class, () -> 
assignment.activeTasks().put("not allowed", Collections.emptySet()));
-        assertEquals(standbyTasks, assignment.standbyTasks());
-        assertThrows(UnsupportedOperationException.class, () -> 
assignment.standbyTasks().put("not allowed", Collections.emptySet()));
-        assertEquals(warmupTasks, assignment.warmupTasks());
-        assertThrows(UnsupportedOperationException.class, () -> 
assignment.warmupTasks().put("not allowed", Collections.emptySet()));
+        TasksTuple tuple = new TasksTuple(activeTasks, standbyTasks, 
warmupTasks);
+
+        assertEquals(activeTasks, tuple.activeTasks());
+        assertThrows(UnsupportedOperationException.class, () -> 
tuple.activeTasks().put("not allowed", Collections.emptySet()));
+        assertEquals(standbyTasks, tuple.standbyTasks());
+        assertThrows(UnsupportedOperationException.class, () -> 
tuple.standbyTasks().put("not allowed", Collections.emptySet()));
+        assertEquals(warmupTasks, tuple.warmupTasks());
+        assertThrows(UnsupportedOperationException.class, () -> 
tuple.warmupTasks().put("not allowed", Collections.emptySet()));
     }
 
     @Test
@@ -95,28 +97,83 @@ public class AssignmentTest {
             .setStandbyTasks(standbyTasks)
             .setWarmupTasks(warmupTasks);
 
-        Assignment assignment = Assignment.fromRecord(record);
+        TasksTuple tuple = TasksTuple.fromTargetAssignmentRecord(record);
 
         assertEquals(
             mkTasksPerSubtopology(
                 mkTasks(SUBTOPOLOGY_1, 1, 2, 3),
                 mkTasks(SUBTOPOLOGY_2, 4, 5, 6)
             ),
-            assignment.activeTasks()
+            tuple.activeTasks()
         );
         assertEquals(
             mkTasksPerSubtopology(
                 mkTasks(SUBTOPOLOGY_1, 7, 8, 9),
                 mkTasks(SUBTOPOLOGY_2, 1, 2, 3)
             ),
-            assignment.standbyTasks()
+            tuple.standbyTasks()
         );
         assertEquals(
             mkTasksPerSubtopology(
                 mkTasks(SUBTOPOLOGY_1, 4, 5, 6),
                 mkTasks(SUBTOPOLOGY_2, 7, 8, 9)
             ),
-            assignment.warmupTasks()
+            tuple.warmupTasks()
+        );
+    }
+
+    @Test
+    public void testMerge() {
+        TasksTuple tuple1 = new TasksTuple(
+            Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3)),
+            Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6)),
+            Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9))
+        );
+
+        TasksTuple tuple2 = new TasksTuple(
+            Map.of(SUBTOPOLOGY_1, Set.of(10, 11)),
+            Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
+            Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
         );
+
+        TasksTuple mergedTuple = tuple1.merge(tuple2);
+
+        assertEquals(Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3, 10, 11)), 
mergedTuple.activeTasks());
+        assertEquals(Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6, 12, 13)), 
mergedTuple.standbyTasks());
+        assertEquals(Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9, 14, 15)), 
mergedTuple.warmupTasks());
+    }
+
+    @Test
+    public void testContainsAny() {
+        TasksTuple tuple1 = new TasksTuple(
+            Map.of(SUBTOPOLOGY_1, Set.of(1, 2, 3)),
+            Map.of(SUBTOPOLOGY_2, Set.of(4, 5, 6)),
+            Map.of(SUBTOPOLOGY_3, Set.of(7, 8, 9))
+        );
+
+        TasksTuple tuple2 = new TasksTuple(
+            Map.of(SUBTOPOLOGY_1, Set.of(3, 10, 11)),
+            Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
+            Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
+        );
+
+        assertTrue(tuple1.containsAny(tuple2));
+
+        TasksTuple tuple3 = new TasksTuple(
+            Map.of(SUBTOPOLOGY_1, Set.of(10, 11)),
+            Map.of(SUBTOPOLOGY_2, Set.of(12, 13)),
+            Map.of(SUBTOPOLOGY_3, Set.of(14, 15))
+        );
+
+        assertFalse(tuple1.containsAny(tuple3));
+    }
+
+    @Test
+    public void testIsEmpty() {
+        TasksTuple emptyTuple = new TasksTuple(Map.of(), Map.of(), Map.of());
+        assertTrue(emptyTuple.isEmpty());
+
+        TasksTuple nonEmptyTuple = new TasksTuple(Map.of(SUBTOPOLOGY_1, 
Set.of(1)), Map.of(), Map.of());
+        assertFalse(nonEmptyTuple.isEmpty());
     }
 }

Reply via email to