This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4ca24a7dbf8 KAFKA-18325: Add TargetAssignmentBuilder (#18676)
4ca24a7dbf8 is described below

commit 4ca24a7dbf84b83b6693442d416a111ba8e53caf
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Feb 3 17:35:28 2025 +0100

    KAFKA-18325: Add TargetAssignmentBuilder (#18676)
    
    A class to build a new target assignment based on the provided parameters. 
As a result, it yields the records that must be persisted to the log and the 
new member assignments as a map.
    
    Compared to the feature branch, I extended the unit tests (testing also 
standby and warm-up task logic) and adopted simplifications due to the 
TasksTuple class.
    
    Reviewers: Bruno Cadonna <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../group/streams/TargetAssignmentBuilder.java     | 351 +++++++++
 .../group/streams/TopologyMetadata.java            | 105 +++
 .../group/streams/assignor/MockAssignor.java       |   2 +-
 .../group/streams/assignor/StickyTaskAssignor.java |   4 +-
 .../group/streams/assignor/TopologyDescriber.java  |  18 +-
 .../group/streams/topics/ConfiguredTopology.java   |   3 +-
 .../group/streams/topics/InternalTopicManager.java |  12 +-
 .../group/streams/TargetAssignmentBuilderTest.java | 852 +++++++++++++++++++++
 .../group/streams/TopologyMetadataTest.java        | 126 +++
 .../group/streams/assignor/MockAssignorTest.java   |   2 +-
 .../streams/assignor/StickyTaskAssignorTest.java   |   4 +-
 .../streams/topics/ConfiguredTopologyTest.java     |  10 +-
 12 files changed, 1468 insertions(+), 21 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
new file mode 100644
index 00000000000..4c1adeec839
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import 
org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
+import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
+import 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Build the new target member assignments based on the provided parameters by 
calling the task assignor.
+ * As a result,
+ * it yields the records that must be persisted to the log and the new member 
assignments as a map from member ID to tasks tuple.
+ * <p>
+ * Records are only created for members which have a new target assignment. If 
their assignment did not change, no new record is needed.
+ * <p>
+ * When a member is deleted, it is assumed that its target assignment record 
is deleted as part of the member deletion process. In other
+ * words, this class does not yield a tombstone for removed members.
+ */
+public class TargetAssignmentBuilder {
+
+    /**
+     * The group ID.
+     */
+    private final String groupId;
+    /**
+     * The group epoch.
+     */
+    private final int groupEpoch;
+
+    /**
+     * The partition assignor used to compute the assignment.
+     */
+    private final TaskAssignor assignor;
+
+    /**
+     * The assignment configs.
+     */
+    private final Map<String, String> assignmentConfigs;
+
+    /**
+     * The members which have been updated or deleted. A null value signals 
deleted members.
+     */
+    private final Map<String, StreamsGroupMember> updatedMembers = new 
HashMap<>();
+
+    /**
+     * The members in the group.
+     */
+    private Map<String, StreamsGroupMember> members = Map.of();
+
+    /**
+     * The partition metadata.
+     */
+    private Map<String, 
org.apache.kafka.coordinator.group.streams.TopicMetadata> partitionMetadata = 
Map.of();
+
+    /**
+     * The existing target assignment.
+     */
+    private Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple> 
targetAssignment = Map.of();
+
+    /**
+     * The topology.
+     */
+    private ConfiguredTopology topology;
+
+    /**
+     * The static members in the group.
+     */
+    private Map<String, String> staticMembers = Map.of();
+
+    /**
+     * Constructs the object.
+     *
+     * @param groupId    The group ID.
+     * @param groupEpoch The group epoch to compute a target assignment for.
+     * @param assignor   The assignor to use to compute the target assignment.
+     */
+    public TargetAssignmentBuilder(
+        String groupId,
+        int groupEpoch,
+        TaskAssignor assignor,
+        Map<String, String> assignmentConfigs
+    ) {
+        this.groupId = Objects.requireNonNull(groupId);
+        this.groupEpoch = groupEpoch;
+        this.assignor = Objects.requireNonNull(assignor);
+        this.assignmentConfigs = Objects.requireNonNull(assignmentConfigs);
+    }
+
+    static AssignmentMemberSpec createAssignmentMemberSpec(
+        StreamsGroupMember member,
+        TasksTuple targetAssignment
+    ) {
+        return new AssignmentMemberSpec(
+            member.instanceId(),
+            member.rackId(),
+            targetAssignment.activeTasks(),
+            targetAssignment.standbyTasks(),
+            targetAssignment.warmupTasks(),
+            member.processId(),
+            member.clientTags(),
+            Map.of(),
+            Map.of()
+        );
+    }
+
+    /**
+     * Adds all the existing members.
+     *
+     * @param members The existing members in the streams group.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withMembers(
+        Map<String, StreamsGroupMember> members
+    ) {
+        this.members = members;
+        return this;
+    }
+
+    /**
+     * Adds all the existing static members.
+     *
+     * @param staticMembers The existing static members in the streams group.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withStaticMembers(
+        Map<String, String> staticMembers
+    ) {
+        this.staticMembers = staticMembers;
+        return this;
+    }
+
+    /**
+     * Adds the partition metadata to use.
+     *
+     * @param partitionMetadata The partition metadata.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withPartitionMetadata(
+        Map<String, org.apache.kafka.coordinator.group.streams.TopicMetadata> 
partitionMetadata
+    ) {
+        this.partitionMetadata = partitionMetadata;
+        return this;
+    }
+
+    /**
+     * Adds the existing target assignment.
+     *
+     * @param targetAssignment The existing target assignment.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withTargetAssignment(
+        Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple> 
targetAssignment
+    ) {
+        this.targetAssignment = targetAssignment;
+        return this;
+    }
+
+    /**
+     * Adds the topology image.
+     *
+     * @param topology The topology.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder withTopology(
+        ConfiguredTopology topology
+    ) {
+        this.topology = topology;
+        return this;
+    }
+
+
+    /**
+     * Adds or updates a member. This is useful when the updated member is not 
yet materialized in memory.
+     *
+     * @param memberId The member ID.
+     * @param member   The member to add or update.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder addOrUpdateMember(
+        String memberId,
+        StreamsGroupMember member
+    ) {
+        this.updatedMembers.put(memberId, member);
+        return this;
+    }
+
+    /**
+     * Removes a member. This is useful when the removed member is not yet 
materialized in memory.
+     *
+     * @param memberId The member ID.
+     * @return This object.
+     */
+    public TargetAssignmentBuilder removeMember(
+        String memberId
+    ) {
+        return addOrUpdateMember(memberId, null);
+    }
+
+    /**
+     * Builds the new target assignment.
+     *
+     * @return A TargetAssignmentResult which contains the records to update 
the existing target assignment.
+     * @throws TaskAssignorException if the target assignment cannot be 
computed.
+     */
+    public TargetAssignmentResult build() throws TaskAssignorException {
+        Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+
+        // Prepare the member spec for all members.
+        members.forEach((memberId, member) -> memberSpecs.put(memberId, 
createAssignmentMemberSpec(
+            member,
+            targetAssignment.getOrDefault(memberId, 
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY)
+        )));
+
+        // Update the member spec if updated or deleted members.
+        updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
+            if (updatedMemberOrNull == null) {
+                memberSpecs.remove(memberId);
+            } else {
+                org.apache.kafka.coordinator.group.streams.TasksTuple 
assignment = targetAssignment.getOrDefault(memberId,
+                    
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
+
+                // A new static member joins and needs to replace an existing 
departed one.
+                if (updatedMemberOrNull.instanceId().isPresent()) {
+                    String previousMemberId = 
staticMembers.get(updatedMemberOrNull.instanceId().get());
+                    if (previousMemberId != null && 
!previousMemberId.equals(memberId)) {
+                        assignment = 
targetAssignment.getOrDefault(previousMemberId,
+                            
org.apache.kafka.coordinator.group.streams.TasksTuple.EMPTY);
+                    }
+                }
+
+                memberSpecs.put(memberId, createAssignmentMemberSpec(
+                    updatedMemberOrNull,
+                    assignment
+                ));
+            }
+        });
+
+        // Compute the assignment.
+        GroupAssignment newGroupAssignment;
+        if (topology.isReady()) {
+            if (topology.subtopologies().isEmpty()) {
+                throw new IllegalStateException("Subtopologies must be present 
if topology is ready.");
+            }
+            newGroupAssignment = assignor.assign(
+                new GroupSpecImpl(
+                    Collections.unmodifiableMap(memberSpecs),
+                    assignmentConfigs
+                ),
+                new TopologyMetadata(partitionMetadata, 
topology.subtopologies().get())
+            );
+        } else {
+            newGroupAssignment = new GroupAssignment(
+                memberSpecs.keySet().stream().collect(Collectors.toMap(x -> x, 
x -> MemberAssignment.empty())));
+        }
+
+        // Compute delta from previous to new target assignment and create the
+        // relevant records.
+        List<CoordinatorRecord> records = new ArrayList<>();
+        Map<String, org.apache.kafka.coordinator.group.streams.TasksTuple> 
newTargetAssignment = new HashMap<>();
+
+        memberSpecs.keySet().forEach(memberId -> {
+            org.apache.kafka.coordinator.group.streams.TasksTuple 
oldMemberAssignment = targetAssignment.get(memberId);
+            org.apache.kafka.coordinator.group.streams.TasksTuple 
newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId);
+
+            newTargetAssignment.put(memberId, newMemberAssignment);
+
+            if (oldMemberAssignment == null) {
+                // If the member had no assignment, we always create a record 
for it.
+                
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+                    groupId,
+                    memberId,
+                    newMemberAssignment
+                ));
+            } else {
+                // If the member had an assignment, we only create a record if 
the
+                // new assignment is different.
+                if (!newMemberAssignment.equals(oldMemberAssignment)) {
+                    
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+                        groupId,
+                        memberId,
+                        newMemberAssignment
+                    ));
+                }
+            }
+        });
+
+        // Bump the target assignment epoch.
+        
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 groupEpoch));
+
+        return new TargetAssignmentResult(records, newTargetAssignment);
+    }
+
+    private TasksTuple newMemberAssignment(
+        GroupAssignment newGroupAssignment,
+        String memberId
+    ) {
+        MemberAssignment newMemberAssignment = 
newGroupAssignment.members().get(memberId);
+        if (newMemberAssignment != null) {
+            return new TasksTuple(
+                newMemberAssignment.activeTasks(),
+                newMemberAssignment.standbyTasks(),
+                newMemberAssignment.warmupTasks()
+            );
+        } else {
+            return TasksTuple.EMPTY;
+        }
+    }
+
+    /**
+     * The assignment result returned by {{@link 
TargetAssignmentBuilder#build()}}.
+     *
+     * @param records          The records that must be applied to the 
__consumer_offsets topics to persist the new target assignment.
+     * @param targetAssignment The new target assignment for the group.
+     */
+    public record TargetAssignmentResult(
+        List<CoordinatorRecord> records,
+        Map<String, TasksTuple> targetAssignment
+    ) {
+        public TargetAssignmentResult {
+            Objects.requireNonNull(records);
+            Objects.requireNonNull(targetAssignment);
+        }
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
new file mode 100644
index 00000000000..d1119cfe011
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.stream.Stream;
+
+/**
+ * The topology metadata class is used by the {@link 
org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor} to get topic 
and
+ * partition metadata for the topology that the streams group using.
+ *
+ * @param topicMetadata  The topic Ids mapped to their corresponding {@link 
TopicMetadata} object, which contains topic and partition
+ *                       metadata.
+ * @param subtopologyMap The configured subtopologies
+ */
+public record TopologyMetadata(Map<String, TopicMetadata> topicMetadata, 
SortedMap<String, ConfiguredSubtopology> subtopologyMap) implements 
TopologyDescriber {
+
+    public TopologyMetadata {
+        topicMetadata = 
Objects.requireNonNull(Collections.unmodifiableMap(topicMetadata));
+        subtopologyMap = 
Objects.requireNonNull(Collections.unmodifiableSortedMap(subtopologyMap));
+    }
+
+    /**
+     * Map of topic names to topic metadata.
+     *
+     * @return The map of topic Ids to topic metadata.
+     */
+    @Override
+    public Map<String, TopicMetadata> topicMetadata() {
+        return this.topicMetadata;
+    }
+
+    /**
+     * Checks whether the given subtopology is associated with a changelog 
topic.
+     *
+     * @param subtopologyId String identifying the subtopology.
+     * @throws NoSuchElementException if the subtopology ID does not exist.
+     * @return true if the subtopology is associated with a changelog topic, 
false otherwise.
+     */
+    @Override
+    public boolean isStateful(String subtopologyId) {
+        final ConfiguredSubtopology subtopology = 
getSubtopologyOrFail(subtopologyId);
+        return !subtopology.stateChangelogTopics().isEmpty();
+    }
+
+    /**
+     * The list of subtopologies in the topology.
+     *
+     * @return a list of subtopology IDs.
+     */
+    @Override
+    public List<String> subtopologies() {
+        return subtopologyMap.keySet().stream().toList();
+    }
+
+    /**
+     * The maximal number of input partitions among all source topics for the 
given subtopology.
+     *
+     * @param subtopologyId String identifying the subtopology.
+     *
+     * @throws NoSuchElementException if the subtopology ID does not exist.
+     * @throws IllegalStateException if the subtopology contains no source 
topics.
+     * @return The maximal number of input partitions among all source topics 
for the given subtopology.
+     */
+    @Override
+    public int maxNumInputPartitions(String subtopologyId) {
+        final ConfiguredSubtopology subtopology = 
getSubtopologyOrFail(subtopologyId);
+        return Stream.concat(
+            subtopology.sourceTopics().stream(),
+            subtopology.repartitionSourceTopics().keySet().stream()
+        ).map(topic -> 
this.topicMetadata.get(topic).numPartitions()).max(Integer::compareTo).orElseThrow(
+            () -> new IllegalStateException("Subtopology does not contain any 
source topics")
+        );
+    }
+
+    private ConfiguredSubtopology getSubtopologyOrFail(String subtopologyId) {
+        if (!subtopologyMap.containsKey(subtopologyId)) {
+            throw new NoSuchElementException(String.format("Topology does not 
contain subtopology %s", subtopologyId));
+        }
+        return subtopologyMap.get(subtopologyId);
+    }
+
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
index ce0bc101101..3e980bad93e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignor.java
@@ -46,7 +46,7 @@ public class MockAssignor implements TaskAssignor {
         Map<String, String[]> subtopologyToActiveMember = new HashMap<>();
 
         for (String subtopology : topologyDescriber.subtopologies()) {
-            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            int numberOfPartitions = 
topologyDescriber.maxNumInputPartitions(subtopology);
             subtopologyToActiveMember.put(subtopology, new 
String[numberOfPartitions]);
         }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
index 00ba701d3e2..c8d3c97d504 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java
@@ -72,7 +72,7 @@ public class StickyTaskAssignor implements TaskAssignor {
         Set<TaskId> ret = new HashSet<>();
         for (String subtopology : topologyDescriber.subtopologies()) {
             if (isActive || topologyDescriber.isStateful(subtopology)) {
-                int numberOfPartitions = 
topologyDescriber.numTasks(subtopology);
+                int numberOfPartitions = 
topologyDescriber.maxNumInputPartitions(subtopology);
                 for (int i = 0; i < numberOfPartitions; i++) {
                     ret.add(new TaskId(subtopology, i));
                 }
@@ -85,7 +85,7 @@ public class StickyTaskAssignor implements TaskAssignor {
         localState = new LocalState();
         localState.allTasks = 0;
         for (String subtopology : topologyDescriber.subtopologies()) {
-            int numberOfPartitions = topologyDescriber.numTasks(subtopology);
+            int numberOfPartitions = 
topologyDescriber.maxNumInputPartitions(subtopology);
             localState.allTasks += numberOfPartitions;
         }
         localState.totalCapacity = groupSpec.members().size();
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
index 2f913bf5514..d6f7a3ab579 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TopologyDescriber.java
@@ -20,30 +20,34 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 /**
- * The subscribed topic describer is used by the {@link TaskAssignor} to 
obtain topic and task metadata of the groups topology.
+ * The topology describer is used by the {@link TaskAssignor} to get topic and 
task metadata of the group's topology.
  */
 public interface TopologyDescriber {
 
     /**
+     * Map of topic names to topic metadata.
+     *
      * @return The list of subtopologies IDs.
      */
     List<String> subtopologies();
 
     /**
-     * The number of tasks for the given subtopology.
+     * The maximal number of input partitions among all source topics for the 
given subtopology.
      *
      * @param subtopologyId String identifying the subtopology.
      *
-     * @return The number of tasks corresponding to the given subtopology ID.
-     * @throws NoSuchElementException if subtopology does not exist in the 
topology.
+     * @throws NoSuchElementException if the subtopology ID does not exist.
+     * @throws IllegalStateException if the subtopology contains no source 
topics.
+     * @return The maximal number of input partitions among all source topics 
for the given subtopology.
      */
-    int numTasks(String subtopologyId) throws NoSuchElementException;
+    int maxNumInputPartitions(String subtopologyId) throws 
NoSuchElementException;
 
     /**
-     * Whether the given subtopology is stateful.
+     * Checks whether the given subtopology is associated with a changelog 
topic.
      *
      * @param subtopologyId String identifying the subtopology.
-     * @return true if the subtopology is stateful, false otherwise.
+     * @throws NoSuchElementException if the subtopology ID does not exist.
+     * @return true if the subtopology is associated with a changelog topic, 
false otherwise.
      */
     boolean isStateful(String subtopologyId);
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
index b6ccb87f7a2..6c7eede16fc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.stream.Collectors;
 
 /**
@@ -41,7 +42,7 @@ import java.util.stream.Collectors;
  *                                    reported back to the client.
  */
 public record ConfiguredTopology(int topologyEpoch,
-                                 Optional<Map<String, ConfiguredSubtopology>> 
subtopologies,
+                                 Optional<SortedMap<String, 
ConfiguredSubtopology>> subtopologies,
                                  Map<String, CreatableTopic> 
internalTopicsToBeCreated,
                                  Optional<TopicConfigurationException> 
topicConfigurationException) {
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
index 31029d9fd9e..33d9f2c4874 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java
@@ -34,6 +34,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -74,12 +76,16 @@ public class InternalTopicManager {
             Map<String, Integer> decidedPartitionCountsForInternalTopics =
                 decidePartitionCounts(logContext, topology, topicMetadata, 
copartitionGroupsBySubtopology, log);
 
-            final Map<String, ConfiguredSubtopology> configuredSubtopologies =
+            final SortedMap<String, ConfiguredSubtopology> 
configuredSubtopologies =
                 subtopologies.stream()
                     .collect(Collectors.toMap(
                         StreamsGroupTopologyValue.Subtopology::subtopologyId,
-                        x -> fromPersistedSubtopology(x, 
decidedPartitionCountsForInternalTopics))
-                    );
+                        x -> fromPersistedSubtopology(x, 
decidedPartitionCountsForInternalTopics),
+                        (v1, v2) -> {
+                            throw new 
RuntimeException(String.format("Duplicate key for values %s and %s", v1, v2));
+                        },
+                        TreeMap::new
+                    ));
 
             Map<String, CreatableTopic> internalTopicsToCreate = 
missingInternalTopics(configuredSubtopologies, topicMetadata);
             if (!internalTopicsToCreate.isEmpty()) {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
new file mode 100644
index 00000000000..114974558b8
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java
@@ -0,0 +1,852 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
+import 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.TaskRole;
+import 
org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
+import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord;
+import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.createAssignmentMemberSpec;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasks;
+import static 
org.apache.kafka.coordinator.group.streams.TaskAssignmentTestUtil.mkTasksTuple;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TargetAssignmentBuilderTest {
+
+    @Test
+    public void testBuildEmptyAssignmentWhenTopologyNotReady() {
+        String groupId = "test-group";
+        int groupEpoch = 1;
+        TaskAssignor assignor = mock(TaskAssignor.class);
+        ConfiguredTopology topology = mock(ConfiguredTopology.class);
+        Map<String, String> assignmentConfigs = new HashMap<>();
+
+        when(topology.isReady()).thenReturn(false);
+
+        TargetAssignmentBuilder builder = new TargetAssignmentBuilder(groupId, 
groupEpoch, assignor, assignmentConfigs)
+            .withTopology(topology);
+
+        TargetAssignmentBuilder.TargetAssignmentResult result = 
builder.build();
+
+        List<CoordinatorRecord> expectedRecords = Collections.singletonList(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 groupEpoch)
+        );
+
+        assertEquals(expectedRecords, result.records());
+        assertEquals(Collections.emptyMap(), result.targetAssignment());
+    }
+
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testCreateAssignmentMemberSpec(TaskRole taskRole) {
+        String fooSubtopologyId = Uuid.randomUuid().toString();
+        String barSubtopologyId = Uuid.randomUuid().toString();
+
+        final Map<String, String> clientTags = mkMap(mkEntry("tag1", 
"value1"), mkEntry("tag2", "value2"));
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member-id")
+            .setRackId("rackId")
+            .setInstanceId("instanceId")
+            .setProcessId("processId")
+            .setClientTags(clientTags)
+            .build();
+
+        TasksTuple assignment = mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        );
+
+        AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec(
+            member,
+            assignment
+        );
+
+        assertEquals(new AssignmentMemberSpec(
+            Optional.of("instanceId"),
+            Optional.of("rackId"),
+            assignment.activeTasks(),
+            assignment.standbyTasks(),
+            assignment.warmupTasks(),
+            "processId",
+            clientTags,
+            Map.of(),
+            Map.of()
+        ), assignmentMemberSpec);
+    }
+
+    @Test
+    public void testEmpty() {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
+        assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        )), result.records());
+        assertEquals(Map.of(), result.targetAssignment());
+    }
+
+    
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testAssignmentHasNotChanged(TaskRole taskRole) {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+        context.addGroupMember("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+
+        context.addGroupMember("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
+
+        assertEquals(List.of(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        )), result.records());
+
+        Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+        expectedAssignment.put("member-2", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
+
+    
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testAssignmentSwapped(TaskRole taskRole) {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+        context.addGroupMember("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+
+        context.addGroupMember("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+
+        context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
+
+        assertEquals(3, result.records().size());
+
+        assertUnorderedRecordsEquals(List.of(List.of(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 4, 5, 6),
+                mkTasks(barSubtopologyId, 4, 5, 6)
+            )),
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 1, 2, 3),
+                mkTasks(barSubtopologyId, 1, 2, 3)
+            ))
+        )), result.records().subList(0, 2));
+
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(2));
+
+        Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-2", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+        expectedAssignment.put("member-1", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
+
+    
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testNewMember(TaskRole taskRole) {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+        context.addGroupMember("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+
+        context.addGroupMember("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        context.updateMemberMetadata("member-3");
+
+        context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
+
+        assertEquals(4, result.records().size());
+
+        assertUnorderedRecordsEquals(List.of(List.of(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 1, 2),
+                mkTasks(barSubtopologyId, 1, 2)
+            )),
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 3, 4),
+                mkTasks(barSubtopologyId, 3, 4)
+            )),
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 5, 6),
+                mkTasks(barSubtopologyId, 5, 6)
+            ))
+        )), result.records().subList(0, 3));
+
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(3));
+
+        Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+        expectedAssignment.put("member-2", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+        expectedAssignment.put("member-3", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
+
+    
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testUpdateMember(TaskRole taskRole) {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+        context.addGroupMember("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.addGroupMember("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        context.addGroupMember("member-3", mkTasksTuple(taskRole,
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        context.updateMemberMetadata(
+            "member-3",
+            Optional.of("instance-id-3"),
+            Optional.of("rack-0")
+        );
+
+        context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
+
+        assertEquals(4, result.records().size());
+
+        assertUnorderedRecordsEquals(List.of(List.of(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 1, 2),
+                mkTasks(barSubtopologyId, 1, 2)
+            )),
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 3, 4),
+                mkTasks(barSubtopologyId, 3, 4)
+            )),
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 5, 6),
+                mkTasks(barSubtopologyId, 5, 6)
+            ))
+        )), result.records().subList(0, 3));
+
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(3));
+
+        Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+        expectedAssignment.put("member-2", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+        expectedAssignment.put("member-3", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
+
+    
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testPartialAssignmentUpdate(TaskRole taskRole) {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, mkMapOfPartitionRacks(6));
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, mkMapOfPartitionRacks(6));
+
+        context.addGroupMember("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.addGroupMember("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        context.addGroupMember("member-3", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 3, 4, 5),
+            mkTasks(barSubtopologyId, 3, 4, 5)
+        ));
+
+        context.prepareMemberAssignment("member-3", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 6),
+            mkTasks(barSubtopologyId, 6)
+        ));
+
+        
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
+
+        assertEquals(3, result.records().size());
+
+        // Member 1 has no record because its assignment did not change.
+        assertUnorderedRecordsEquals(List.of(List.of(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 3, 4, 5),
+                mkTasks(barSubtopologyId, 3, 4, 5)
+            )),
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 6),
+                mkTasks(barSubtopologyId, 6)
+            ))
+        )), result.records().subList(0, 2));
+
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(2));
+
+        Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+        expectedAssignment.put("member-2", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 3, 4, 5),
+            mkTasks(barSubtopologyId, 3, 4, 5)
+        ));
+        expectedAssignment.put("member-3", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 6),
+            mkTasks(barSubtopologyId, 6)
+        ));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
+
+    
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testDeleteMember(TaskRole taskRole) {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+        context.addGroupMember("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.addGroupMember("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        context.addGroupMember("member-3", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        context.removeMember("member-3");
+
+        context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = context.build();
+
+        assertEquals(3, result.records().size());
+
+        assertUnorderedRecordsEquals(List.of(List.of(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-1", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 1, 2, 3),
+                mkTasks(barSubtopologyId, 1, 2, 3)
+            )),
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-2", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 4, 5, 6),
+                mkTasks(barSubtopologyId, 4, 5, 6)
+            ))
+        )), result.records().subList(0, 2));
+
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(2));
+
+        Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 1, 2, 3),
+            mkTasks(barSubtopologyId, 1, 2, 3)
+        ));
+        expectedAssignment.put("member-2", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 4, 5, 6),
+            mkTasks(barSubtopologyId, 4, 5, 6)
+        ));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
+
+    
+    @ParameterizedTest
+    @EnumSource(TaskRole.class)
+    public void testReplaceStaticMember(TaskRole taskRole) {
+        TargetAssignmentBuilderTestContext context = new 
TargetAssignmentBuilderTestContext(
+            "my-group",
+            20
+        );
+
+        String fooSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("foo", 6, Map.of());
+        String barSubtopologyId = 
context.addSubtopologyWithSingleSourceTopic("bar", 6, Map.of());
+
+        context.addGroupMember("member-1", "instance-member-1", 
mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.addGroupMember("member-2", "instance-member-2", 
mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        context.addGroupMember("member-3", "instance-member-3", 
mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        // Static member 3 leaves
+        context.removeMember("member-3");
+
+        // Another static member joins with the same instance id as the 
departed one
+        context.updateMemberMetadata("member-3-a", 
Optional.of("instance-member-3"),
+            Optional.empty());
+
+        context.prepareMemberAssignment("member-1", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+
+        context.prepareMemberAssignment("member-2", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        context.prepareMemberAssignment("member-3-a", mkTasksTuple(taskRole,
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        TargetAssignmentBuilder.TargetAssignmentResult result = 
context.build();
+
+        assertEquals(2, result.records().size());
+
+        assertUnorderedRecordsEquals(List.of(List.of(
+            newStreamsGroupTargetAssignmentRecord("my-group", "member-3-a", 
mkTasksTuple(taskRole,
+                mkTasks(fooSubtopologyId, 5, 6),
+                mkTasks(barSubtopologyId, 5, 6)
+            ))
+        )), result.records().subList(0, 1));
+
+        assertEquals(newStreamsGroupTargetAssignmentEpochRecord(
+            "my-group",
+            20
+        ), result.records().get(1));
+
+        Map<String, TasksTuple> expectedAssignment = new HashMap<>();
+        expectedAssignment.put("member-1", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 1, 2),
+            mkTasks(barSubtopologyId, 1, 2)
+        ));
+        expectedAssignment.put("member-2", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 3, 4),
+            mkTasks(barSubtopologyId, 3, 4)
+        ));
+
+        expectedAssignment.put("member-3-a", mkTasksTuple(taskRole, 
+            mkTasks(fooSubtopologyId, 5, 6),
+            mkTasks(barSubtopologyId, 5, 6)
+        ));
+
+        assertEquals(expectedAssignment, result.targetAssignment());
+    }
+
+    public static class TargetAssignmentBuilderTestContext {
+
+        private final String groupId;
+        private final int groupEpoch;
+        private final TaskAssignor assignor = mock(TaskAssignor.class);
+        private final SortedMap<String, ConfiguredSubtopology> subtopologies = 
new TreeMap<>();
+        private final ConfiguredTopology topology = new ConfiguredTopology(0, 
Optional.of(subtopologies), new HashMap<>(),
+            Optional.empty());
+        private final Map<String, StreamsGroupMember> members = new 
HashMap<>();
+        private final Map<String, 
org.apache.kafka.coordinator.group.streams.TopicMetadata> subscriptionMetadata 
= new HashMap<>();
+        private final Map<String, StreamsGroupMember> updatedMembers = new 
HashMap<>();
+        private final Map<String, TasksTuple> targetAssignment = new 
HashMap<>();
+        private final Map<String, MemberAssignment> memberAssignments = new 
HashMap<>();
+        private final Map<String, String> staticMembers = new HashMap<>();
+        private MetadataImageBuilder topicsImageBuilder = new 
MetadataImageBuilder();
+
+        public TargetAssignmentBuilderTestContext(
+            String groupId,
+            int groupEpoch
+        ) {
+            this.groupId = groupId;
+            this.groupEpoch = groupEpoch;
+        }
+
+        public void addGroupMember(
+            String memberId,
+            TasksTuple targetTasks
+        ) {
+            addGroupMember(memberId, null, targetTasks);
+        }
+
+        private void addGroupMember(
+            String memberId,
+            String instanceId,
+            TasksTuple targetTasks
+        ) {
+            StreamsGroupMember.Builder memberBuilder = new 
StreamsGroupMember.Builder(memberId);
+            memberBuilder.setProcessId("processId");
+            memberBuilder.setClientTags(Map.of());
+            memberBuilder.setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090));
+
+            if (instanceId != null) {
+                memberBuilder.setInstanceId(instanceId);
+                staticMembers.put(instanceId, memberId);
+            } else {
+                memberBuilder.setInstanceId(null);
+            }
+            memberBuilder.setRackId(null);
+            members.put(memberId, memberBuilder.build());
+            targetAssignment.put(memberId, targetTasks);
+        }
+
+        public String addSubtopologyWithSingleSourceTopic(
+            String topicName,
+            int numTasks,
+            Map<Integer, Set<String>> partitionRacks
+        ) {
+            String subtopologyId = Uuid.randomUuid().toString();
+            Uuid topicId = Uuid.randomUuid();
+            subscriptionMetadata.put(topicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(
+                topicId,
+                topicName,
+                numTasks,
+                partitionRacks
+            ));
+            topicsImageBuilder = topicsImageBuilder.addTopic(topicId, 
topicName, numTasks);
+            subtopologies.put(subtopologyId, new 
ConfiguredSubtopology(Set.of(topicId.toString()), Map.of(), Set.of(), 
Map.of()));
+
+            return subtopologyId;
+        }
+
+        public void updateMemberMetadata(
+            String memberId
+        ) {
+            updateMemberMetadata(
+                memberId,
+                Optional.empty(),
+                Optional.empty()
+            );
+        }
+
+        public void updateMemberMetadata(
+            String memberId,
+            Optional<String> instanceId,
+            Optional<String> rackId
+        ) {
+            StreamsGroupMember existingMember = members.get(memberId);
+            StreamsGroupMember.Builder builder;
+            if (existingMember != null) {
+                builder = new StreamsGroupMember.Builder(existingMember);
+            } else {
+                builder = new StreamsGroupMember.Builder(memberId);
+                builder.setProcessId("processId");
+                builder.setRackId(null);
+                builder.setInstanceId(null);
+                builder.setClientTags(Map.of());
+                builder.setUserEndpoint(new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090));
+            }
+            updatedMembers.put(memberId, builder
+                .maybeUpdateInstanceId(instanceId)
+                .maybeUpdateRackId(rackId)
+                .build());
+        }
+
+        public void removeMember(
+            String memberId
+        ) {
+            this.updatedMembers.put(memberId, null);
+        }
+
+        public void prepareMemberAssignment(
+            String memberId,
+            TasksTuple assignment
+        ) {
+            memberAssignments.put(memberId, new 
MemberAssignment(assignment.activeTasks(), assignment.standbyTasks(), 
assignment.warmupTasks()));
+        }
+
+        public 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 build() {
+            // Prepare expected member specs.
+            Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
+
+            // All the existing members are prepared.
+            members.forEach((memberId, member) ->
+                memberSpecs.put(memberId, createAssignmentMemberSpec(
+                        member,
+                        targetAssignment.getOrDefault(memberId, 
TasksTuple.EMPTY)
+                    )
+                ));
+
+            // All the updated are added and all the deleted
+            // members are removed.
+            updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
+                if (updatedMemberOrNull == null) {
+                    memberSpecs.remove(memberId);
+                } else {
+                    TasksTuple assignment = 
targetAssignment.getOrDefault(memberId,
+                        TasksTuple.EMPTY);
+
+                    // A new static member joins and needs to replace an 
existing departed one.
+                    if (updatedMemberOrNull.instanceId().isPresent()) {
+                        String previousMemberId = 
staticMembers.get(updatedMemberOrNull.instanceId().get());
+                        if (previousMemberId != null && 
!previousMemberId.equals(memberId)) {
+                            assignment = 
targetAssignment.getOrDefault(previousMemberId,
+                                TasksTuple.EMPTY);
+                        }
+                    }
+
+                    memberSpecs.put(memberId, createAssignmentMemberSpec(
+                        updatedMemberOrNull,
+                        assignment
+                    ));
+                }
+            });
+
+            // Prepare the expected topology metadata.
+            TopologyMetadata topologyMetadata = new 
TopologyMetadata(subscriptionMetadata, subtopologies);
+
+            // Prepare the expected assignment spec.
+            GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new 
HashMap<>());
+
+            // We use `any` here to always return an assignment but use 
`verify` later on
+            // to ensure that the input was correct.
+            when(assignor.assign(any(), any()))
+                .thenReturn(new GroupAssignment(memberAssignments));
+
+            // Create and populate the assignment builder.
+            org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder 
builder = new 
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(
+                groupId, groupEpoch, assignor, Map.of())
+                .withMembers(members)
+                .withTopology(topology)
+                .withStaticMembers(staticMembers)
+                .withPartitionMetadata(subscriptionMetadata)
+                .withTargetAssignment(targetAssignment);
+
+            // Add the updated members or delete the deleted members.
+            updatedMembers.forEach((memberId, updatedMemberOrNull) -> {
+                if (updatedMemberOrNull != null) {
+                    builder.addOrUpdateMember(memberId, updatedMemberOrNull);
+                } else {
+                    builder.removeMember(memberId);
+                }
+            });
+
+            // Execute the builder.
+            
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult
 result = builder.build();
+
+            // Verify that the assignor was called once with the expected
+            // assignment spec.
+            verify(assignor, times(1))
+                .assign(groupSpec, topologyMetadata);
+
+            return result;
+        }
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
new file mode 100644
index 00000000000..a5c18a6f0f2
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TopologyMetadataTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import 
org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
+import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+class TopologyMetadataTest {
+
+    private Map<String, TopicMetadata> topicMetadata;
+    private SortedMap<String, ConfiguredSubtopology> subtopologyMap;
+    private TopologyMetadata topologyMetadata;
+
+    @BeforeEach
+    void setUp() {
+        topicMetadata = new HashMap<>();
+        subtopologyMap = new TreeMap<>();
+        topologyMetadata = new TopologyMetadata(topicMetadata, subtopologyMap);
+    }
+
+    @Test
+    void testTopicMetadata() {
+        assertEquals(topicMetadata, topologyMetadata.topicMetadata());
+    }
+
+    @Test
+    void testTopology() {
+        assertEquals(subtopologyMap, topologyMetadata.subtopologyMap());
+    }
+
+    @Test
+    void testIsStateful() {
+        ConfiguredInternalTopic internalTopic = 
mock(ConfiguredInternalTopic.class);
+        ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class);
+        ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class);
+        subtopologyMap.put("subtopology1", subtopology1);
+        subtopologyMap.put("subtopology2", subtopology2);
+        
when(subtopology1.stateChangelogTopics()).thenReturn(Map.of("state_changelog_topic",
 internalTopic));
+        when(subtopology2.stateChangelogTopics()).thenReturn(Map.of());
+
+        assertTrue(topologyMetadata.isStateful("subtopology1"));
+        assertFalse(topologyMetadata.isStateful("subtopology2"));
+    }
+
+    @Test
+    void testMaxNumInputPartitions() {
+        ConfiguredInternalTopic internalTopic = 
mock(ConfiguredInternalTopic.class);
+        ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
+        subtopologyMap.put("subtopology1", subtopology);
+        when(subtopology.sourceTopics()).thenReturn(Set.of("source_topic"));
+        
when(subtopology.repartitionSourceTopics()).thenReturn(Map.of("repartition_source_topic",
 internalTopic));
+
+        TopicMetadata topicMeta1 = mock(TopicMetadata.class);
+        TopicMetadata topicMeta2 = mock(TopicMetadata.class);
+        topicMetadata.put("source_topic", topicMeta1);
+        topicMetadata.put("repartition_source_topic", topicMeta2);
+        when(topicMeta1.numPartitions()).thenReturn(3);
+        when(topicMeta2.numPartitions()).thenReturn(4);
+
+        assertEquals(4, 
topologyMetadata.maxNumInputPartitions("subtopology1"));
+    }
+
+    @Test
+    void testSubtopologies() {
+        ConfiguredSubtopology subtopology1 = mock(ConfiguredSubtopology.class);
+        ConfiguredSubtopology subtopology2 = mock(ConfiguredSubtopology.class);
+        subtopologyMap.put("subtopology1", subtopology1);
+        subtopologyMap.put("subtopology2", subtopology2);
+
+        List<String> expectedSubtopologies = List.of("subtopology1", 
"subtopology2");
+        assertEquals(expectedSubtopologies, topologyMetadata.subtopologies());
+    }
+
+    @Test
+    void testIsStatefulThrowsExceptionWhenSubtopologyIdDoesNotExist() {
+        assertThrows(NoSuchElementException.class, () -> 
topologyMetadata.isStateful("non_existent_subtopology"));
+    }
+
+    @Test
+    void 
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyIdDoesNotExist() {
+        assertThrows(NoSuchElementException.class, () -> 
topologyMetadata.maxNumInputPartitions("non_existent_subtopology"));
+    }
+
+    @Test
+    void 
testMaxNumInputPartitionsThrowsExceptionWhenSubtopologyContainsNoSourceTopics() 
{
+        ConfiguredSubtopology subtopology = mock(ConfiguredSubtopology.class);
+        when(subtopology.sourceTopics()).thenReturn(Set.of());
+        when(subtopology.repartitionSourceTopics()).thenReturn(Map.of());
+        subtopologyMap.put("subtopology1", subtopology);
+
+        assertThrows(IllegalStateException.class, () -> 
topologyMetadata.maxNumInputPartitions("subtopology1"));
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
index 25dada072df..d44b24549e0 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/MockAssignorTest.java
@@ -253,7 +253,7 @@ public class MockAssignorTest {
         }
 
         @Override
-        public int numTasks(String subtopologyId) {
+        public int maxNumInputPartitions(String subtopologyId) {
             return numPartitions;
         }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
index 112dd2281d8..542aad73dee 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java
@@ -1094,7 +1094,7 @@ public class StickyTaskAssignorTest {
         }
 
         @Override
-        public int numTasks(String subtopologyId) throws 
NoSuchElementException {
+        public int maxNumInputPartitions(String subtopologyId) throws 
NoSuchElementException {
             return numTasks;
         }
 
@@ -1112,7 +1112,7 @@ public class StickyTaskAssignorTest {
         }
 
         @Override
-        public int numTasks(String subtopologyId) throws 
NoSuchElementException {
+        public int maxNumInputPartitions(String subtopologyId) throws 
NoSuchElementException {
             if (subtopologyId.equals("test-subtopology1"))
                 return 6;
             return 1;
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
index 2d6d096235a..a909629fa20 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
@@ -26,6 +26,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -53,7 +55,7 @@ public class ConfiguredTopologyTest {
         assertThrows(NullPointerException.class,
             () -> new ConfiguredTopology(
                 0,
-                Optional.of(Map.of()),
+                Optional.of(new TreeMap<>()),
                 null,
                 Optional.empty()
             )
@@ -77,7 +79,7 @@ public class ConfiguredTopologyTest {
         assertThrows(IllegalArgumentException.class,
             () -> new ConfiguredTopology(
                 -1,
-                Optional.of(Map.of()),
+                Optional.of(new TreeMap<>()),
                 Collections.emptyMap(),
                 Optional.empty()
             )
@@ -100,7 +102,7 @@ public class ConfiguredTopologyTest {
     @Test
     public void testIsReady() {
         ConfiguredTopology readyTopology = new ConfiguredTopology(
-            1, Optional.of(Map.of()), new HashMap<>(), Optional.empty());
+            1, Optional.of(new TreeMap<>()), new HashMap<>(), 
Optional.empty());
         assertTrue(readyTopology.isReady());
 
         ConfiguredTopology notReadyTopology = new ConfiguredTopology(
@@ -114,7 +116,7 @@ public class ConfiguredTopologyTest {
         ConfiguredSubtopology subtopologyMock = 
mock(ConfiguredSubtopology.class);
         StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new 
StreamsGroupDescribeResponseData.Subtopology();
         
when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse);
-        Map<String, ConfiguredSubtopology> subtopologies = new HashMap<>();
+        SortedMap<String, ConfiguredSubtopology> subtopologies = new 
TreeMap<>();
         subtopologies.put("subtopology1", subtopologyMock);
         Map<String, CreatableTopic> internalTopicsToBeCreated = new 
HashMap<>();
         Optional<TopicConfigurationException> topicConfigurationException = 
Optional.empty();

Reply via email to