dajac commented on code in PR #14182:
URL: https://github.com/apache/kafka/pull/14182#discussion_r1325561675


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * The assignment builder is used to construct the target assignment based on 
the members' subscriptions.
+ *
+ * This class contains common utility methods and a class for obtaining and 
storing rack information.
+ */
+public abstract class AbstractUniformAssignmentBuilder {
+    protected abstract GroupAssignment buildAssignment();
+
+    /**
+     * Determines if rack-aware assignment is appropriate based on the 
provided rack information.
+     *
+     * @param memberRacks           Racks where members are located.
+     * @param partitionRacks        Racks where partitions are located.
+     * @param racksPerPartition     Map of partitions to their associated 
racks.
+     *
+     * @return {@code true} if rack-aware assignment should be applied; {@code 
false} otherwise.
+     */
+    protected static boolean useRackAwareAssignment(
+        Set<String> memberRacks,
+        Set<String> partitionRacks,
+        Map<TopicIdPartition, Set<String>> racksPerPartition
+    ) {
+        if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, 
partitionRacks))
+            return false;
+        else {
+            return 
!racksPerPartition.values().stream().allMatch(partitionRacks::equals);
+        }
+    }
+
+    /**
+     * Adds the topic's partition to the member's target assignment.
+     */
+    protected static void addPartitionToAssignment(
+        int partition,
+        Uuid topicId,
+        String memberId,
+        Map<String, MemberAssignment> targetAssignment

Review Comment:
   small nit: In such case, I usually prefer to put the mutated object first in 
the list of arguments. Then, I would also put memberId, then topicId and 
finally partition to follow their hierarchy. I would also rename 
targetAssignment to assignment because the method is not tight to a target 
assignment in the end. It could be any assignment map.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * The assignment builder is used to construct the target assignment based on 
the members' subscriptions.
+ *
+ * This class contains common utility methods and a class for obtaining and 
storing rack information.
+ */
+public abstract class AbstractUniformAssignmentBuilder {
+    protected abstract GroupAssignment buildAssignment();
+
+    /**
+     * Determines if rack-aware assignment is appropriate based on the 
provided rack information.
+     *
+     * @param memberRacks           Racks where members are located.
+     * @param partitionRacks        Racks where partitions are located.
+     * @param racksPerPartition     Map of partitions to their associated 
racks.
+     *
+     * @return {@code true} if rack-aware assignment should be applied; {@code 
false} otherwise.
+     */
+    protected static boolean useRackAwareAssignment(
+        Set<String> memberRacks,
+        Set<String> partitionRacks,
+        Map<TopicIdPartition, Set<String>> racksPerPartition
+    ) {
+        if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, 
partitionRacks))
+            return false;
+        else {
+            return 
!racksPerPartition.values().stream().allMatch(partitionRacks::equals);
+        }
+    }
+
+    /**
+     * Adds the topic's partition to the member's target assignment.
+     */
+    protected static void addPartitionToAssignment(
+        int partition,
+        Uuid topicId,
+        String memberId,
+        Map<String, MemberAssignment> targetAssignment
+    ) {
+        targetAssignment.get(memberId)
+            .targetPartitions()
+            .computeIfAbsent(topicId, __ -> new HashSet<>())
+            .add(partition);
+    }
+
+    /**
+     * Constructs a list of {@code TopicIdPartition} for each topic Id based 
on its partition count.
+     *
+     * @param allTopicIds                   The subscribed topic Ids.
+     * @param subscribedTopicDescriber      Utility to fetch the partition 
count for a given topic.
+     *
+     * @return List of sorted {@code TopicIdPartition} for all provided topic 
Ids.
+     */
+    protected static List<TopicIdPartition> allTopicIdPartitions(
+        Collection<Uuid> allTopicIds,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) {
+        List<TopicIdPartition> allTopicIdPartitions = new ArrayList<>();
+        // Sorted so that partitions from each topic can be distributed 
amongst its subscribers equally.
+        allTopicIds.stream().sorted().forEach(topic ->
+            IntStream.range(0, subscribedTopicDescriber.numPartitions(topic))
+                .forEach(i -> allTopicIdPartitions.add(new 
TopicIdPartition(topic, i))
+                )

Review Comment:
   bit: This parenthesis seems misplaced, no?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The Uniform Assignor distributes topic partitions among group members for a
+ * balanced and potentially rack aware assignment.
+ * The assignor employs two different strategies based on the nature of topic
+ * subscriptions across the group members:
+ * <ul>
+ *     <li>
+ *         <b> Optimized Uniform Assignment Builder: </b> This strategy is 
used when all members have subscribed
+ *         to the same set of topics.
+ *     </li>
+ *     <li>
+ *         <b> General Uniform Assignment Builder: </b> This strategy is used 
when members have varied topic
+ *         subscriptions.
+ *     </li>
+ * </ul>
+ *
+ * The appropriate strategy is automatically chosen based on the current 
members' topic subscriptions.
+ *
+ * @see OptimizedUniformAssignmentBuilder
+ * @see GeneralUniformAssignmentBuilder
+ */
+public class UniformAssignor implements PartitionAssignor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformAssignor.class);
+    public static final String UNIFORM_ASSIGNOR_NAME = "uniform";
+
+    @Override
+    public String name() {
+        return UNIFORM_ASSIGNOR_NAME;
+    }
+
+    /**
+     * Perform the group assignment given the current members and
+     * topics metadata.
+     *
+     * @param assignmentSpec                The assignment specification that 
included member metadata.
+     * @param subscribedTopicDescriber      The topic and cluster metadata 
describer {@link SubscribedTopicDescriber}.
+     * @return The new target assignment for the group.
+     */
+    @Override
+    public GroupAssignment assign(
+        AssignmentSpec assignmentSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        AbstractUniformAssignmentBuilder assignmentBuilder;
+
+        if (assignmentSpec.members().isEmpty())
+            return new GroupAssignment(Collections.emptyMap());
+
+        if (allSubscriptionsEqual(assignmentSpec.members())) {
+            LOG.debug("Detected that all members are subscribed to the same 
set of topics, invoking the "
+                + "optimized assignment algorithm");
+            assignmentBuilder = new 
OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
+        } else {
+            assignmentBuilder = new GeneralUniformAssignmentBuilder();
+            LOG.debug("Detected that all members are subscribed to a different 
set of topics, invoking the "

Review Comment:
   nit: Let's log before creating the builder to be consistent with the other 
branch.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * all its members subscribed to the same set of topics.
+ * It is optimized since the assignment can be done in fewer, less complicated 
steps compared to when
+ * the subscriptions are different across the members.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *
+ * <li> Balance:          Ensure partitions are distributed equally among all 
members.
+ *                        The difference in assignments sizes between any two 
members
+ *                        should not exceed one partition. </li>
+ * <li> Rack Matching:    When feasible, aim to assign partitions to members
+ *                        located on the same rack thus avoiding cross-zone 
traffic. </li>
+ * <li> Stickiness:       Minimize partition movements among members by 
retaining
+ *                        as much of the existing assignment as possible. </li>
+ *
+ * The assignment builder prioritizes the properties in the following order:
+ *      Balance > Rack Matching > Stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+    /**
+     * The assignment specification which includes member metadata.
+     */
+    private final AssignmentSpec assignmentSpec;
+    /**
+     * The topic and partition metadata describer.
+     */
+    private final SubscribedTopicDescriber subscribedTopicDescriber;
+    /**
+     * The set of topic Ids that the consumer group is subscribed to.
+     */
+    private final Set<Uuid> subscriptionIds;
+    /**
+     * Rack information and helper methods.
+     */
+    private final RackInfo rackInfo;
+    /**
+     * The number of members to receive an extra partition beyond the minimum 
quota.
+     * Minimum Quota = Total Partitions / Total Members
+     * Example: If there are 11 partitions to be distributed among 3 members,
+     *          each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
+     */
+    private int remainingMembersToGetAnExtraPartition;
+    /**
+     * Members mapped to the remaining number of partitions needed to meet the 
minimum quota,
+     * including members that are eligible to receive an extra partition.
+     */
+    private final Map<String, Integer> potentiallyUnfilledMembers;
+    /**
+     * Members mapped to the remaining number of partitions needed to meet the 
full quota.
+     * Full quota = minQuota + one extra partition (if applicable).
+     */
+    private Map<String, Integer> unfilledMembers;
+    /**
+     * The partitions that still need to be assigned.
+     * Initially this contains all the subscribed topics' partitions.
+     */
+    private List<TopicIdPartition> unassignedPartitions;
+    /**
+     * The target assignment.
+     */
+    private final Map<String, MemberAssignment> targetAssignment;
+    /**
+     * Tracks the existing owner of each partition.
+     * Only populated when the rack awareness strategy is used.
+     */
+    private final Map<TopicIdPartition, String> currentPartitionOwners;
+
+    OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.assignmentSpec = assignmentSpec;
+        this.subscribedTopicDescriber = subscribedTopicDescriber;
+        this.subscriptionIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+        this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+        this.potentiallyUnfilledMembers = new HashMap<>();
+        this.unfilledMembers = new HashMap<>();
+        this.targetAssignment = new HashMap<>();
+        // Without rack-aware strategy, tracking current owners of unassigned 
partitions is unnecessary
+        // as all sticky partitions are retained until a member meets its 
quota.
+        this.currentPartitionOwners = rackInfo.useRackStrategy ? new 
HashMap<>() : Collections.emptyMap();
+    }
+
+    /**
+     * Here's the step-by-step breakdown of the assignment process:
+     *
+     * <li> Compute the quotas of partitions for each member based on the 
total partitions and member count.</li>
+     * <li> Initialize unassigned partitions to all the topic partitions and
+     *      remove partitions from the list as and when they are assigned.</li>
+     * <li> For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.</li>
+     * <li> If a partition's rack mismatches with its owner, track it for 
future use.</li>
+     * <li> Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.</li>
+     * <li> Depending on members needing extra partitions, select members from 
the potentially unfilled list
+     *      and add them to the unfilled list.</li>
+     * <li> Proceed with a round-robin assignment adhering to rack awareness.
+     *      For each unassigned partition, locate the first compatible member 
from the unfilled list.</li>
+     * <li> If no rack-compatible member is found, revert to the tracked 
current owner.
+     *      If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.</li>
+     */
+    @Override
+    protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
+        int totalPartitionsCount = 0;
+
+        for (Uuid topicId : subscriptionIds) {
+            int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (partitionCount == -1) {
+                throw new PartitionAssignorException(
+                    "Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+                );
+            } else {
+                totalPartitionsCount += partitionCount;
+            }
+        }
+
+        if (subscriptionIds.isEmpty()) {
+            LOG.info("The subscription list is empty, returning an empty 
assignment");
+            return new GroupAssignment(Collections.emptyMap());
+        }
+
+        // The minimum required quota that each member needs to meet for a 
balanced assignment.
+        // This is the same for all members.
+        final int numberOfMembers = assignmentSpec.members().size();
+        final int minQuota = totalPartitionsCount / numberOfMembers;
+        remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
+
+        assignmentSpec.members().keySet().forEach(memberId ->
+            targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
+        ));
+
+        // sorted list of all partitions.
+        unassignedPartitions = allTopicIdPartitions(subscriptionIds, 
subscribedTopicDescriber);
+
+        assignStickyPartitions(minQuota);
+
+        unfilledMembers = computeUnfilledMembers();
+
+        if (!unassignedPartitionsCountEqualsRemainingAssignmentsCount()) {
+            throw new PartitionAssignorException("Number of available 
partitions is not equal to the total requirement");
+        }
+
+        if (rackInfo.useRackStrategy) rackAwareRoundRobinAssignment();
+        unassignedPartitionsRoundRobinAssignment();
+
+        return new GroupAssignment(targetAssignment);
+    }
+
+    /**
+     * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
+     * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
+     * In addition, if rack awareness is enabled, it is ensured that a 
partition's rack matches the member's rack.
+     *
+     * <p>For each member:
+     * <ol>
+     *     <li> Find the valid current assignment considering topic 
subscriptions, metadata and rack information.</li>
+     *     <li> If the current assignments exist, retain partitions up to the 
minimum quota.</li>
+     *     <li> If the current assignment size is greater than the minimum 
quota and
+     *          there are members that could get an extra partition, assign 
the next partition as well.</li>
+     *     <li> Finally, if the member's current assignment size is less than 
the minimum quota,
+     *          add them to the potentially unfilled members map and track the 
number of remaining
+     *          partitions required to meet the quota.</li>
+     * </ol>
+     * </p>
+     */
+    private void assignStickyPartitions(int minQuota) {
+        assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+            List<TopicIdPartition> validCurrentAssignment = 
validCurrentAssignment(
+                memberId,
+                assignmentMemberSpec.assignedPartitions()
+            );
+
+            int currentAssignmentSize = validCurrentAssignment.size();
+            // Number of partitions required to meet the minimum quota.
+            int remaining = minQuota - currentAssignmentSize;
+
+            if (currentAssignmentSize > 0) {
+                int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
+                IntStream.range(0, retainedPartitionsCount).forEach(i -> {
+                    TopicIdPartition topicIdPartition = 
validCurrentAssignment.get(i);
+                    addPartitionToAssignment(
+                        topicIdPartition.partition(),
+                        topicIdPartition.topicId(),
+                        memberId,
+                        targetAssignment
+                    );
+                    unassignedPartitions.remove(topicIdPartition);
+                });
+
+                // The extra partition is located at the last index from the 
previous step.
+                if (remaining < 0 && remainingMembersToGetAnExtraPartition > 
0) {
+                    TopicIdPartition topicIdPartition = 
validCurrentAssignment.get(retainedPartitionsCount);
+                    addPartitionToAssignment(
+                        topicIdPartition.partition(),
+                        topicIdPartition.topicId(),
+                        memberId,
+                        targetAssignment
+                    );
+                    unassignedPartitions.remove(topicIdPartition);
+                    remainingMembersToGetAnExtraPartition--;
+                }
+            }
+
+            if (remaining >= 0) {
+                potentiallyUnfilledMembers.put(memberId, remaining);
+            }
+
+        });
+    }
+
+    /**
+     * Filters the current assignment of partitions for a given member.
+     *
+     * Any partition that still belongs to the member's subscribed topics list 
is considered valid.
+     * If rack aware strategy can be used: Only partitions with matching rack 
are valid and non-matching partitions are
+     * tracked with their current owner for future use.
+     *
+     * @param memberId              The Id of the member whose assignment is 
being validated.
+     * @param currentAssignment    The partitions currently assigned to the 
member.
+     *
+     * @return List of valid partitions after applying the filters.
+     */
+    private List<TopicIdPartition> validCurrentAssignment(
+        String memberId,
+        Map<Uuid, Set<Integer>> currentAssignment
+    ) {
+        List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>();
+        currentAssignment.forEach((topicId, partitions) -> {
+            if (subscriptionIds.contains(topicId)) {
+                partitions.forEach(partition -> {
+                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
+                    if (rackInfo.useRackStrategy && 
rackInfo.racksMismatch(memberId, topicIdPartition)) {
+                        currentPartitionOwners.put(topicIdPartition, memberId);
+                    } else {
+                        validCurrentAssignmentList.add(topicIdPartition);
+                    }
+                });
+            } else {
+                LOG.debug("The topic " + topicId + " is no longer present in 
the subscribed topics list");
+            }
+        });
+
+        return validCurrentAssignmentList;
+    }
+
+    /**
+     * Allocates the unassigned partitions to unfilled members present in the 
same rack in a round-robin fashion.
+     */
+    private void rackAwareRoundRobinAssignment() {
+        Queue<String> roundRobinMembers = new 
LinkedList<>(unfilledMembers.keySet());
+
+        // Sort partitions in ascending order by number of potential members 
with matching racks.
+        // Partitions with no potential members in the same rack aren't 
included in this list.
+        List<TopicIdPartition> sortedPartitions = 
rackInfo.sortPartitionsByRackMembers(unassignedPartitions);
+
+        sortedPartitions.forEach(partition -> {
+            boolean assigned = false;
+            for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
+                String memberId = roundRobinMembers.poll();
+                Integer remainingPartitionCount = 
unfilledMembers.getOrDefault(memberId, 0);
+
+                if (remainingPartitionCount > 0 && 
!rackInfo.racksMismatch(memberId, partition)) {
+                    assignPartitionToMember(memberId, partition);
+                    assigned = true;
+                    unassignedPartitions.remove(partition);
+                }
+
+                // Only re-add the member to the end of the queue if it's 
still available for assignment.
+                if (unfilledMembers.containsKey(memberId)) {
+                    roundRobinMembers.add(memberId);
+                }
+            }
+        });
+    }
+
+    /**
+     * Allocates the unassigned partitions to unfilled members in a 
round-robin fashion.
+     *
+     * If the rack-aware strategy is enabled, partitions are attempted to be 
assigned back to their current owners first.
+     * This is because pure stickiness without rack matching is not considered 
initially.
+     *
+     * If a partition couldn't be assigned to its current owner due to the 
quotas OR
+     * if the rack-aware strategy is not enabled, the partitions are allocated 
to the unfilled members.
+     */
+    private void unassignedPartitionsRoundRobinAssignment() {
+        Queue<String> roundRobinMembers = new 
LinkedList<>(unfilledMembers.keySet());
+
+        unassignedPartitions.forEach(partition -> {
+            boolean assigned = false;
+
+            if (rackInfo.useRackStrategy && 
currentPartitionOwners.containsKey(partition)) {
+                String prevOwner = currentPartitionOwners.get(partition);
+                if (unfilledMembers.containsKey(prevOwner)) {
+                    assignPartitionToMember(prevOwner, partition);
+                    assigned = true;
+                    if (!unfilledMembers.containsKey(prevOwner)) {
+                        roundRobinMembers.remove(prevOwner);
+                    }
+                }
+            }
+
+            // Only re-add the member to the end of the queue if it's still 
available for assignment.

Review Comment:
   Ah ah. You got me :). I actually raised this because in 
rackAwareRoundRobinAssignment, your put it right before the if. That also works 
because the comment is really about that if.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * all its members subscribed to the same set of topics.
+ * It is optimized since the assignment can be done in fewer, less complicated 
steps compared to when
+ * the subscriptions are different across the members.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *
+ * <li> Balance:          Ensure partitions are distributed equally among all 
members.
+ *                        The difference in assignments sizes between any two 
members
+ *                        should not exceed one partition. </li>
+ * <li> Rack Matching:    When feasible, aim to assign partitions to members
+ *                        located on the same rack thus avoiding cross-zone 
traffic. </li>
+ * <li> Stickiness:       Minimize partition movements among members by 
retaining
+ *                        as much of the existing assignment as possible. </li>
+ *
+ * The assignment builder prioritizes the properties in the following order:
+ *      Balance > Rack Matching > Stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+    /**
+     * The assignment specification which includes member metadata.
+     */
+    private final AssignmentSpec assignmentSpec;
+    /**
+     * The topic and partition metadata describer.
+     */
+    private final SubscribedTopicDescriber subscribedTopicDescriber;
+    /**
+     * The set of topic Ids that the consumer group is subscribed to.
+     */
+    private final Set<Uuid> subscriptionIds;
+    /**
+     * Rack information and helper methods.
+     */
+    private final RackInfo rackInfo;
+    /**
+     * The number of members to receive an extra partition beyond the minimum 
quota.
+     * Minimum Quota = Total Partitions / Total Members
+     * Example: If there are 11 partitions to be distributed among 3 members,
+     *          each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
+     */
+    private int remainingMembersToGetAnExtraPartition;
+    /**
+     * Members mapped to the remaining number of partitions needed to meet the 
minimum quota,
+     * including members that are eligible to receive an extra partition.
+     */
+    private final Map<String, Integer> potentiallyUnfilledMembers;
+    /**
+     * Members mapped to the remaining number of partitions needed to meet the 
full quota.
+     * Full quota = minQuota + one extra partition (if applicable).
+     */
+    private Map<String, Integer> unfilledMembers;
+    /**
+     * The partitions that still need to be assigned.
+     * Initially this contains all the subscribed topics' partitions.
+     */
+    private List<TopicIdPartition> unassignedPartitions;
+    /**
+     * The target assignment.
+     */
+    private final Map<String, MemberAssignment> targetAssignment;
+    /**
+     * Tracks the existing owner of each partition.
+     * Only populated when the rack awareness strategy is used.
+     */
+    private final Map<TopicIdPartition, String> currentPartitionOwners;
+
+    OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.assignmentSpec = assignmentSpec;
+        this.subscribedTopicDescriber = subscribedTopicDescriber;
+        this.subscriptionIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+        this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+        this.potentiallyUnfilledMembers = new HashMap<>();
+        this.unfilledMembers = new HashMap<>();
+        this.targetAssignment = new HashMap<>();
+        // Without rack-aware strategy, tracking current owners of unassigned 
partitions is unnecessary
+        // as all sticky partitions are retained until a member meets its 
quota.
+        this.currentPartitionOwners = rackInfo.useRackStrategy ? new 
HashMap<>() : Collections.emptyMap();
+    }
+
+    /**
+     * Here's the step-by-step breakdown of the assignment process:
+     *
+     * <li> Compute the quotas of partitions for each member based on the 
total partitions and member count.</li>
+     * <li> Initialize unassigned partitions to all the topic partitions and
+     *      remove partitions from the list as and when they are assigned.</li>
+     * <li> For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.</li>
+     * <li> If a partition's rack mismatches with its owner, track it for 
future use.</li>
+     * <li> Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.</li>
+     * <li> Depending on members needing extra partitions, select members from 
the potentially unfilled list
+     *      and add them to the unfilled list.</li>
+     * <li> Proceed with a round-robin assignment adhering to rack awareness.
+     *      For each unassigned partition, locate the first compatible member 
from the unfilled list.</li>
+     * <li> If no rack-compatible member is found, revert to the tracked 
current owner.
+     *      If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.</li>
+     */
+    @Override
+    protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
+        int totalPartitionsCount = 0;
+
+        for (Uuid topicId : subscriptionIds) {
+            int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (partitionCount == -1) {
+                throw new PartitionAssignorException(
+                    "Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+                );
+            } else {
+                totalPartitionsCount += partitionCount;
+            }
+        }
+
+        if (subscriptionIds.isEmpty()) {
+            LOG.info("The subscription list is empty, returning an empty 
assignment");
+            return new GroupAssignment(Collections.emptyMap());
+        }
+
+        // The minimum required quota that each member needs to meet for a 
balanced assignment.
+        // This is the same for all members.
+        final int numberOfMembers = assignmentSpec.members().size();
+        final int minQuota = totalPartitionsCount / numberOfMembers;
+        remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
+
+        assignmentSpec.members().keySet().forEach(memberId ->
+            targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
+        ));
+
+        // sorted list of all partitions.
+        unassignedPartitions = allTopicIdPartitions(subscriptionIds, 
subscribedTopicDescriber);
+
+        assignStickyPartitions(minQuota);
+
+        unfilledMembers = computeUnfilledMembers();
+
+        if (!unassignedPartitionsCountEqualsRemainingAssignmentsCount()) {
+            throw new PartitionAssignorException("Number of available 
partitions is not equal to the total requirement");
+        }
+
+        if (rackInfo.useRackStrategy) rackAwareRoundRobinAssignment();
+        unassignedPartitionsRoundRobinAssignment();
+
+        return new GroupAssignment(targetAssignment);
+    }
+
+    /**
+     * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
+     * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
+     * In addition, if rack awareness is enabled, it is ensured that a 
partition's rack matches the member's rack.
+     *
+     * <p>For each member:
+     * <ol>
+     *     <li> Find the valid current assignment considering topic 
subscriptions, metadata and rack information.</li>
+     *     <li> If the current assignments exist, retain partitions up to the 
minimum quota.</li>
+     *     <li> If the current assignment size is greater than the minimum 
quota and
+     *          there are members that could get an extra partition, assign 
the next partition as well.</li>
+     *     <li> Finally, if the member's current assignment size is less than 
the minimum quota,
+     *          add them to the potentially unfilled members map and track the 
number of remaining
+     *          partitions required to meet the quota.</li>
+     * </ol>
+     * </p>
+     */
+    private void assignStickyPartitions(int minQuota) {
+        assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+            List<TopicIdPartition> validCurrentAssignment = 
validCurrentAssignment(
+                memberId,
+                assignmentMemberSpec.assignedPartitions()
+            );
+
+            int currentAssignmentSize = validCurrentAssignment.size();
+            // Number of partitions required to meet the minimum quota.
+            int remaining = minQuota - currentAssignmentSize;
+
+            if (currentAssignmentSize > 0) {
+                int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
+                IntStream.range(0, retainedPartitionsCount).forEach(i -> {
+                    TopicIdPartition topicIdPartition = 
validCurrentAssignment.get(i);
+                    addPartitionToAssignment(
+                        topicIdPartition.partition(),
+                        topicIdPartition.topicId(),
+                        memberId,
+                        targetAssignment
+                    );
+                    unassignedPartitions.remove(topicIdPartition);
+                });
+
+                // The extra partition is located at the last index from the 
previous step.
+                if (remaining < 0 && remainingMembersToGetAnExtraPartition > 
0) {
+                    TopicIdPartition topicIdPartition = 
validCurrentAssignment.get(retainedPartitionsCount);
+                    addPartitionToAssignment(
+                        topicIdPartition.partition(),
+                        topicIdPartition.topicId(),
+                        memberId,
+                        targetAssignment
+                    );
+                    unassignedPartitions.remove(topicIdPartition);
+                    remainingMembersToGetAnExtraPartition--;
+                }
+            }
+
+            if (remaining >= 0) {
+                potentiallyUnfilledMembers.put(memberId, remaining);
+            }
+
+        });
+    }
+
+    /**
+     * Filters the current assignment of partitions for a given member.
+     *
+     * Any partition that still belongs to the member's subscribed topics list 
is considered valid.
+     * If rack aware strategy can be used: Only partitions with matching rack 
are valid and non-matching partitions are
+     * tracked with their current owner for future use.
+     *
+     * @param memberId              The Id of the member whose assignment is 
being validated.
+     * @param currentAssignment    The partitions currently assigned to the 
member.
+     *
+     * @return List of valid partitions after applying the filters.
+     */
+    private List<TopicIdPartition> validCurrentAssignment(
+        String memberId,
+        Map<Uuid, Set<Integer>> currentAssignment
+    ) {
+        List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>();
+        currentAssignment.forEach((topicId, partitions) -> {
+            if (subscriptionIds.contains(topicId)) {
+                partitions.forEach(partition -> {
+                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
+                    if (rackInfo.useRackStrategy && 
rackInfo.racksMismatch(memberId, topicIdPartition)) {
+                        currentPartitionOwners.put(topicIdPartition, memberId);
+                    } else {
+                        validCurrentAssignmentList.add(topicIdPartition);
+                    }
+                });
+            } else {
+                LOG.debug("The topic " + topicId + " is no longer present in 
the subscribed topics list");
+            }
+        });
+
+        return validCurrentAssignmentList;
+    }
+
+    /**
+     * Allocates the unassigned partitions to unfilled members present in the 
same rack in a round-robin fashion.
+     */
+    private void rackAwareRoundRobinAssignment() {
+        Queue<String> roundRobinMembers = new 
LinkedList<>(unfilledMembers.keySet());
+
+        // Sort partitions in ascending order by number of potential members 
with matching racks.
+        // Partitions with no potential members in the same rack aren't 
included in this list.
+        List<TopicIdPartition> sortedPartitions = 
rackInfo.sortPartitionsByRackMembers(unassignedPartitions);
+
+        sortedPartitions.forEach(partition -> {
+            boolean assigned = false;
+            for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
+                String memberId = roundRobinMembers.poll();
+                Integer remainingPartitionCount = 
unfilledMembers.getOrDefault(memberId, 0);
+
+                if (remainingPartitionCount > 0 && 
!rackInfo.racksMismatch(memberId, partition)) {
+                    assignPartitionToMember(memberId, partition);
+                    assigned = true;
+                    unassignedPartitions.remove(partition);
+                }
+
+                // Only re-add the member to the end of the queue if it's 
still available for assignment.
+                if (unfilledMembers.containsKey(memberId)) {
+                    roundRobinMembers.add(memberId);
+                }
+            }
+        });
+    }
+
+    /**
+     * Allocates the unassigned partitions to unfilled members in a 
round-robin fashion.
+     *
+     * If the rack-aware strategy is enabled, partitions are attempted to be 
assigned back to their current owners first.
+     * This is because pure stickiness without rack matching is not considered 
initially.
+     *
+     * If a partition couldn't be assigned to its current owner due to the 
quotas OR
+     * if the rack-aware strategy is not enabled, the partitions are allocated 
to the unfilled members.
+     */
+    private void unassignedPartitionsRoundRobinAssignment() {
+        Queue<String> roundRobinMembers = new 
LinkedList<>(unfilledMembers.keySet());
+
+        unassignedPartitions.forEach(partition -> {
+            boolean assigned = false;
+
+            if (rackInfo.useRackStrategy && 
currentPartitionOwners.containsKey(partition)) {
+                String prevOwner = currentPartitionOwners.get(partition);
+                if (unfilledMembers.containsKey(prevOwner)) {
+                    assignPartitionToMember(prevOwner, partition);
+                    assigned = true;
+                    if (!unfilledMembers.containsKey(prevOwner)) {
+                        roundRobinMembers.remove(prevOwner);
+                    }
+                }
+            }
+
+            // Only re-add the member to the end of the queue if it's still 
available for assignment.
+            for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
+                String memberId = roundRobinMembers.poll();
+                if (unfilledMembers.get(memberId) > 0) {
+                    assignPartitionToMember(memberId, partition);
+                    assigned = true;
+                }
+                if (unfilledMembers.containsKey(memberId)) {
+                    roundRobinMembers.add(memberId);
+                }
+            }
+        });
+    }
+
+    /**
+     * Assigns the specified partition to the given member and updates the 
unfilled members map.
+     *
+     * <p>
+     * If the member has met their allocation quota, the member is removed 
from the unfilled members map.
+     * Otherwise, the count of remaining partitions that can be assigned to 
the member is updated.
+     * </p>
+     *
+     * @param memberId              The Id of the member to which the 
partition will be assigned.
+     * @param topicIdPartition      The topicIdPartition to be assigned.
+     */
+    private void assignPartitionToMember(String memberId, TopicIdPartition 
topicIdPartition) {
+        addPartitionToAssignment(
+            topicIdPartition.partition(),
+            topicIdPartition.topicId(),
+            memberId,
+            targetAssignment
+        );
+
+        int remainingPartitionCount = unfilledMembers.get(memberId) - 1;
+        if (remainingPartitionCount == 0) {
+            unfilledMembers.remove(memberId);
+        } else {
+            unfilledMembers.put(memberId, remainingPartitionCount);
+        }
+    }
+
+    /**
+     * Determines which members can still be assigned partitions to meet the 
full quota.
+     *
+     * The map contains:
+     * <ol>
+     *     <li> Members that have met the minimum quota but will receive an 
extra partition. </li>
+     *     <li> Members that have not yet met the minimum quota. If there are 
still members that could receive
+     *          an extra partition, the remaining value is updated to include 
this. </li>
+     * </ol>
+     *
+     * @return A map of member Ids and the required partitions to meet the 
quota.
+     */
+    private Map<String, Integer> computeUnfilledMembers() {
+        Map<String, Integer> unfilledMembers = new HashMap<>();

Review Comment:
   I see... It is a bit annoying to copy the map here but I can live with it if 
you think that it is better like this. I have another question regarding this 
code. My understanding is that we basically decide here which members will get 
the extra partitions. Basically, the first members while iterating over the map 
will get them. When the rack awareness is enabled, I wonder if we could get in 
a situation where the members with the extra partitions and the unassigned 
partitions are completely misaligned to due this.
   
   Let's take an example. We have a group with 10 members (from 1 to 10) 
subscribed to topic A. Topic A has 20 partitions so each member has 2 assigned 
partitions. All the partitions are available in a single rack. Members 1 to 5 
are in rack R1 and members 6 to 10 in rack R2. Now let's say that we add 5 
partitions to A. 3 are in R1 and 2 in R2. If we iterate from 1 to 10 to assign 
the extra partitions, it means that 1 to 5 will get an extra partitions even 
though two of the partitions are not in the same rack. Is this a possible 
scenario? It is perhaps a bit too extreme...



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static java.lang.Math.min;
+
+/**
+ * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * all its members subscribed to the same set of topics.
+ * It is optimized since the assignment can be done in fewer, less complicated 
steps compared to when
+ * the subscriptions are different across the members.
+ *
+ * Assignments are done according to the following principles:
+ *
+ *
+ * <li> Balance:          Ensure partitions are distributed equally among all 
members.
+ *                        The difference in assignments sizes between any two 
members
+ *                        should not exceed one partition. </li>
+ * <li> Rack Matching:    When feasible, aim to assign partitions to members
+ *                        located on the same rack thus avoiding cross-zone 
traffic. </li>
+ * <li> Stickiness:       Minimize partition movements among members by 
retaining
+ *                        as much of the existing assignment as possible. </li>
+ *
+ * The assignment builder prioritizes the properties in the following order:
+ *      Balance > Rack Matching > Stickiness.
+ */
+public class OptimizedUniformAssignmentBuilder extends 
UniformAssignor.AbstractAssignmentBuilder {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+    /**
+     * The assignment specification which includes member metadata.
+     */
+    private final AssignmentSpec assignmentSpec;
+    /**
+     * The topic and partition metadata describer.
+     */
+    private final SubscribedTopicDescriber subscribedTopicDescriber;
+    /**
+     * The set of topic Ids that the consumer group is subscribed to.
+     */
+    private final Set<Uuid> subscriptionIds;
+    /**
+     * Rack information and helper methods.
+     */
+    private final RackInfo rackInfo;
+    /**
+     * The number of members to receive an extra partition beyond the minimum 
quota.
+     * Minimum Quota = Total Partitions / Total Members
+     * Example: If there are 11 partitions to be distributed among 3 members,
+     *          each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
+     */
+    private int remainingMembersToGetAnExtraPartition;
+    /**
+     * Members mapped to the remaining number of partitions needed to meet the 
minimum quota,
+     * including members that are eligible to receive an extra partition.
+     */
+    private final Map<String, Integer> potentiallyUnfilledMembers;
+    /**
+     * Members mapped to the remaining number of partitions needed to meet the 
full quota.
+     * Full quota = minQuota + one extra partition (if applicable).
+     */
+    private Map<String, Integer> unfilledMembers;
+    /**
+     * The partitions that still need to be assigned.
+     * Initially this contains all the subscribed topics' partitions.
+     */
+    private List<TopicIdPartition> unassignedPartitions;
+    /**
+     * The target assignment.
+     */
+    private final Map<String, MemberAssignment> targetAssignment;
+    /**
+     * Tracks the existing owner of each partition.
+     * Only populated when the rack awareness strategy is used.
+     */
+    private final Map<TopicIdPartition, String> currentPartitionOwners;
+
+    OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.assignmentSpec = assignmentSpec;
+        this.subscribedTopicDescriber = subscribedTopicDescriber;
+        this.subscriptionIds = new 
HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds());
+        this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+        this.potentiallyUnfilledMembers = new HashMap<>();
+        this.unfilledMembers = new HashMap<>();
+        this.targetAssignment = new HashMap<>();
+        // Without rack-aware strategy, tracking current owners of unassigned 
partitions is unnecessary
+        // as all sticky partitions are retained until a member meets its 
quota.
+        this.currentPartitionOwners = rackInfo.useRackStrategy ? new 
HashMap<>() : Collections.emptyMap();
+    }
+
+    /**
+     * Here's the step-by-step breakdown of the assignment process:
+     *
+     * <li> Compute the quotas of partitions for each member based on the 
total partitions and member count.</li>
+     * <li> Initialize unassigned partitions to all the topic partitions and
+     *      remove partitions from the list as and when they are assigned.</li>
+     * <li> For existing assignments, retain partitions based on the 
determined quota and member's rack compatibility.</li>
+     * <li> If a partition's rack mismatches with its owner, track it for 
future use.</li>
+     * <li> Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.</li>
+     * <li> Depending on members needing extra partitions, select members from 
the potentially unfilled list
+     *      and add them to the unfilled list.</li>
+     * <li> Proceed with a round-robin assignment adhering to rack awareness.
+     *      For each unassigned partition, locate the first compatible member 
from the unfilled list.</li>
+     * <li> If no rack-compatible member is found, revert to the tracked 
current owner.
+     *      If that member can't accommodate the partition due to quota 
limits, resort to a generic round-robin assignment.</li>
+     */
+    @Override
+    protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
+        int totalPartitionsCount = 0;
+
+        for (Uuid topicId : subscriptionIds) {
+            int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (partitionCount == -1) {
+                throw new PartitionAssignorException(
+                    "Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+                );
+            } else {
+                totalPartitionsCount += partitionCount;
+            }
+        }
+
+        if (subscriptionIds.isEmpty()) {
+            LOG.info("The subscription list is empty, returning an empty 
assignment");
+            return new GroupAssignment(Collections.emptyMap());
+        }
+
+        // The minimum required quota that each member needs to meet for a 
balanced assignment.
+        // This is the same for all members.
+        final int numberOfMembers = assignmentSpec.members().size();
+        final int minQuota = totalPartitionsCount / numberOfMembers;
+        remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
+
+        assignmentSpec.members().keySet().forEach(memberId ->
+            targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
+        ));
+
+        // sorted list of all partitions.
+        unassignedPartitions = allTopicIdPartitions(subscriptionIds, 
subscribedTopicDescriber);
+
+        assignStickyPartitions(minQuota);
+
+        unfilledMembers = computeUnfilledMembers();
+
+        if (!unassignedPartitionsCountEqualsRemainingAssignmentsCount()) {
+            throw new PartitionAssignorException("Number of available 
partitions is not equal to the total requirement");
+        }
+
+        if (rackInfo.useRackStrategy) rackAwareRoundRobinAssignment();
+        unassignedPartitionsRoundRobinAssignment();
+
+        return new GroupAssignment(targetAssignment);
+    }
+
+    /**
+     * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
+     * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
+     * In addition, if rack awareness is enabled, it is ensured that a 
partition's rack matches the member's rack.
+     *
+     * <p>For each member:
+     * <ol>
+     *     <li> Find the valid current assignment considering topic 
subscriptions, metadata and rack information.</li>
+     *     <li> If the current assignments exist, retain partitions up to the 
minimum quota.</li>
+     *     <li> If the current assignment size is greater than the minimum 
quota and
+     *          there are members that could get an extra partition, assign 
the next partition as well.</li>
+     *     <li> Finally, if the member's current assignment size is less than 
the minimum quota,
+     *          add them to the potentially unfilled members map and track the 
number of remaining
+     *          partitions required to meet the quota.</li>
+     * </ol>
+     * </p>
+     */
+    private void assignStickyPartitions(int minQuota) {
+        assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> {
+            List<TopicIdPartition> validCurrentAssignment = 
validCurrentAssignment(
+                memberId,
+                assignmentMemberSpec.assignedPartitions()
+            );
+
+            int currentAssignmentSize = validCurrentAssignment.size();
+            // Number of partitions required to meet the minimum quota.
+            int remaining = minQuota - currentAssignmentSize;
+
+            if (currentAssignmentSize > 0) {
+                int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
+                IntStream.range(0, retainedPartitionsCount).forEach(i -> {
+                    TopicIdPartition topicIdPartition = 
validCurrentAssignment.get(i);
+                    addPartitionToAssignment(
+                        topicIdPartition.partition(),
+                        topicIdPartition.topicId(),
+                        memberId,
+                        targetAssignment
+                    );
+                    unassignedPartitions.remove(topicIdPartition);
+                });
+
+                // The extra partition is located at the last index from the 
previous step.
+                if (remaining < 0 && remainingMembersToGetAnExtraPartition > 
0) {
+                    TopicIdPartition topicIdPartition = 
validCurrentAssignment.get(retainedPartitionsCount);
+                    addPartitionToAssignment(
+                        topicIdPartition.partition(),
+                        topicIdPartition.topicId(),
+                        memberId,
+                        targetAssignment
+                    );
+                    unassignedPartitions.remove(topicIdPartition);
+                    remainingMembersToGetAnExtraPartition--;
+                }
+            }
+
+            if (remaining >= 0) {
+                potentiallyUnfilledMembers.put(memberId, remaining);
+            }
+
+        });
+    }
+
+    /**
+     * Filters the current assignment of partitions for a given member.
+     *
+     * Any partition that still belongs to the member's subscribed topics list 
is considered valid.
+     * If rack aware strategy can be used: Only partitions with matching rack 
are valid and non-matching partitions are
+     * tracked with their current owner for future use.
+     *
+     * @param memberId              The Id of the member whose assignment is 
being validated.
+     * @param currentAssignment    The partitions currently assigned to the 
member.
+     *
+     * @return List of valid partitions after applying the filters.
+     */
+    private List<TopicIdPartition> validCurrentAssignment(
+        String memberId,
+        Map<Uuid, Set<Integer>> currentAssignment
+    ) {
+        List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>();
+        currentAssignment.forEach((topicId, partitions) -> {
+            if (subscriptionIds.contains(topicId)) {
+                partitions.forEach(partition -> {
+                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
+                    if (rackInfo.useRackStrategy && 
rackInfo.racksMismatch(memberId, topicIdPartition)) {
+                        currentPartitionOwners.put(topicIdPartition, memberId);
+                    } else {
+                        validCurrentAssignmentList.add(topicIdPartition);
+                    }
+                });
+            } else {
+                LOG.debug("The topic " + topicId + " is no longer present in 
the subscribed topics list");
+            }
+        });
+
+        return validCurrentAssignmentList;
+    }
+
+    /**
+     * Allocates the unassigned partitions to unfilled members present in the 
same rack in a round-robin fashion.
+     */
+    private void rackAwareRoundRobinAssignment() {
+        Queue<String> roundRobinMembers = new 
LinkedList<>(unfilledMembers.keySet());
+
+        // Sort partitions in ascending order by number of potential members 
with matching racks.
+        // Partitions with no potential members in the same rack aren't 
included in this list.
+        List<TopicIdPartition> sortedPartitions = 
rackInfo.sortPartitionsByRackMembers(unassignedPartitions);
+
+        sortedPartitions.forEach(partition -> {
+            boolean assigned = false;
+            for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
+                String memberId = roundRobinMembers.poll();
+                Integer remainingPartitionCount = 
unfilledMembers.getOrDefault(memberId, 0);
+
+                if (remainingPartitionCount > 0 && 
!rackInfo.racksMismatch(memberId, partition)) {
+                    assignPartitionToMember(memberId, partition);
+                    assigned = true;
+                    unassignedPartitions.remove(partition);
+                }
+
+                // Only re-add the member to the end of the queue if it's 
still available for assignment.
+                if (unfilledMembers.containsKey(memberId)) {
+                    roundRobinMembers.add(memberId);
+                }
+            }
+        });
+    }
+
+    /**
+     * Allocates the unassigned partitions to unfilled members in a 
round-robin fashion.
+     *
+     * If the rack-aware strategy is enabled, partitions are attempted to be 
assigned back to their current owners first.
+     * This is because pure stickiness without rack matching is not considered 
initially.
+     *
+     * If a partition couldn't be assigned to its current owner due to the 
quotas OR
+     * if the rack-aware strategy is not enabled, the partitions are allocated 
to the unfilled members.
+     */
+    private void unassignedPartitionsRoundRobinAssignment() {
+        Queue<String> roundRobinMembers = new 
LinkedList<>(unfilledMembers.keySet());

Review Comment:
   How about creating the queue just before calling 
rackAwareRoundRobinAssignment and unassignedPartitionsRoundRobinAssignment and 
passing it as a parameter?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##########
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class OptimizedUniformAssignmentBuilderTest {
+    private final UniformAssignor assignor = new UniformAssignor();
+    private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw");
+    private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe");
+    private final Uuid topic3Uuid = Uuid.fromString("T3-CU8fVTLCz5YMkLoDQsa");
+    private final String topic1Name = "topic1";
+    private final String topic2Name = "topic2";
+    private final String topic3Name = "topic3";
+    private final String memberA = "A";
+    private final String memberB = "B";
+    private final String memberC = "C";
+
+    @Test
+    public void testOneMemberNoTopicSubscription() {
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(
+            Collections.singletonMap(
+                topic1Uuid,
+                new TopicMetadata(
+                    topic1Uuid,
+                    topic1Name,
+                    3,
+                    mkMapOfPartitionRacks(3)
+                )
+            )
+        );
+
+        Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
+            memberA,
+            new AssignmentMemberSpec(
+                Optional.empty(),
+                Optional.empty(),
+                Collections.emptyList(),
+                Collections.emptyMap()
+            )
+        );
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        GroupAssignment groupAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        assertEquals(Collections.emptyMap(), groupAssignment.members());
+    }
+
+    @Test
+    public void testOneMemberSubscribedToNonexistentTopic() {
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(
+            Collections.singletonMap(
+                topic1Uuid,
+                new TopicMetadata(
+                    topic1Uuid,
+                    topic1Name,
+                    3,
+                    mkMapOfPartitionRacks(3)
+                )
+            )
+        );
+
+        Map<String, AssignmentMemberSpec> members = Collections.singletonMap(
+            memberA,
+            new AssignmentMemberSpec(
+                Optional.empty(),
+                Optional.empty(),
+                Collections.singletonList(topic2Uuid),
+                Collections.emptyMap()
+            )
+        );
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+
+        assertThrows(PartitionAssignorException.class,
+            () -> assignor.assign(assignmentSpec, subscribedTopicMetadata));
+    }
+
+    @Test
+    public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic3Uuid, new TopicMetadata(
+            topic3Uuid,
+            topic3Name,
+            2,
+            mkMapOfPartitionRacks(2)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 2),
+            mkTopicAssignment(topic3Uuid, 1)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic3Uuid, 0)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void testFirstAssignmentTwoMembersTwoTopicsNoPartitionRacks() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            Collections.emptyMap()
+        ));
+        topicMetadata.put(topic3Uuid, new TopicMetadata(
+            topic3Uuid,
+            topic3Name,
+            2,
+            Collections.emptyMap()
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack1"),
+            Arrays.asList(topic1Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack2"),
+            Arrays.asList(topic1Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 2),
+            mkTopicAssignment(topic3Uuid, 1)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic3Uuid, 0)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testFirstAssignmentThreeMembersThreeTopicsWithMemberAndPartitionRacks() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic3Uuid, new TopicMetadata(
+            topic3Uuid,
+            topic3Name,
+            2,
+            mkMapOfPartitionRacks(2)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack1"),
+            Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack2"),
+            Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberC, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack3"),
+            Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 0),
+            mkTopicAssignment(topic3Uuid, 0)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic2Uuid, 1),
+            mkTopicAssignment(topic3Uuid, 1)
+        ));
+        expectedAssignment.put(memberC, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 2),
+            mkTopicAssignment(topic2Uuid, 2)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testFirstAssignmentThreeMembersThreeTopicsWithSomeMemberAndPartitionRacks() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic3Uuid, new TopicMetadata(
+            topic3Uuid,
+            topic3Name,
+            2,
+            Collections.emptyMap()
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack2"),
+            Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberC, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid),
+            Collections.emptyMap()
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 2),
+            mkTopicAssignment(topic3Uuid, 1)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1, 2),
+            mkTopicAssignment(topic2Uuid, 1)
+        ));
+        expectedAssignment.put(memberC, mkAssignment(
+            mkTopicAssignment(topic2Uuid, 0),
+            mkTopicAssignment(topic3Uuid, 0)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic3Uuid, new TopicMetadata(
+            topic3Uuid,
+            topic3Name,
+            2,
+            mkMapOfPartitionRacks(2)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.singletonList(topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.singletonList(topic3Uuid),
+            Collections.emptyMap()
+        ));
+        members.put(memberC, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.singletonList(topic3Uuid),
+            Collections.emptyMap()
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        // Topic 3 has 2 partitions but three members subscribed to it - one 
of them should not get an assignment.
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic3Uuid, 0)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic3Uuid, 1)
+        ));
+        expectedAssignment.put(memberC,
+            Collections.emptyMap()
+        );
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void testValidityAndBalanceForLargeSampleSet() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        for (int i = 1; i < 100; i++) {
+            Uuid topicId = Uuid.randomUuid();
+            topicMetadata.put(topicId, new TopicMetadata(
+                topicId,
+                "topic-" + i,
+                3,
+                mkMapOfPartitionRacks(3)
+            ));
+        }
+
+        List<Uuid> subscribedTopics = new ArrayList<>(topicMetadata.keySet());
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+        for (int i = 1; i < 50; i++) {
+            members.put("member" + i, new AssignmentMemberSpec(
+                Optional.empty(),
+                Optional.empty(),
+                subscribedTopics,
+                Collections.emptyMap()
+            ));
+        }
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 1),
+                mkTopicAssignment(topic2Uuid, 0, 1)
+            )
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 2),
+                mkTopicAssignment(topic2Uuid, 2)
+            )
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 1),
+            mkTopicAssignment(topic2Uuid, 0)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 2),
+            mkTopicAssignment(topic2Uuid, 1, 2)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void testReassignmentOnRackChangesWithMemberAndPartitionRacks() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+
+        // Initially A was in rack 1 and B was in rack 2, now let's switch 
them.
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 1),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack2"),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 2),
+                mkTopicAssignment(topic2Uuid, 1, 2)
+            )
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack1"),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1, 2),
+            mkTopicAssignment(topic2Uuid, 2)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 1, 0)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testReassignmentOnAddingPartitionsWithMemberAndPartitionRacks() {
+        // Initially T1,T2 had 3 partitions.
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            5,
+            mkMapOfPartitionRacks(5)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            5,
+            mkMapOfPartitionRacks(5)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0),
+                mkTopicAssignment(topic2Uuid, 0, 2)
+            )
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack0"),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1, 2),
+                mkTopicAssignment(topic2Uuid, 1)
+            )
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack1"),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 3),
+            mkTopicAssignment(topic2Uuid, 0, 3, 4)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1, 2, 4),
+            mkTopicAssignment(topic2Uuid, 1, 2)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() 
{
+        // Simulating adding partition to T1 and T2 - originally T1 -> 3 
Partitions and T2 -> 3 Partitions
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            6,
+            mkMapOfPartitionRacks(6)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            5,
+            mkMapOfPartitionRacks(5)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 2),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1, 2)
+            )
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
+            mkTopicAssignment(topic2Uuid, 0, 4)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1, 4),
+            mkTopicAssignment(topic2Uuid, 1, 2, 3)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembersTwoTopics()
 {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new HashMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new 
TreeMap<>(mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 2),
+            mkTopicAssignment(topic2Uuid, 0)
+        ));
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new 
TreeMap<>(mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic2Uuid, 1, 2)
+        ));
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        // Add a new member to trigger a re-assignment.
+        members.put(memberC, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            Collections.emptyMap()
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 2)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic2Uuid, 1)
+        ));
+        expectedAssignment.put(memberC, mkAssignment(
+            mkTopicAssignment(topic2Uuid, 0, 2)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void testReassignmentOnAddingMemberWithRackAndPartitionRacks() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0),
+                mkTopicAssignment(topic2Uuid, 0, 2)
+            )
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack0"),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1, 2),
+                mkTopicAssignment(topic2Uuid, 1)
+            )
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack1"),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        // New member added.
+        members.put(memberC, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack2"),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            Collections.emptyMap()
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 0)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic2Uuid, 1)
+        ));
+        expectedAssignment.put(memberC, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 2),
+            mkTopicAssignment(topic2Uuid, 2)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeMembersTwoTopics()
 {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            3,
+            mkMapOfPartitionRacks(3)
+        ));
+
+        Map<String, AssignmentMemberSpec> members = new HashMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 0)
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic2Uuid, 1)
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Arrays.asList(topic1Uuid, topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        // Member C was removed
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 2),
+            mkTopicAssignment(topic2Uuid, 0)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic2Uuid, 1, 2)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWithTwoMembersTwoTopics()
 {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            2,
+            mkMapOfPartitionRacks(2)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            2,
+            mkMapOfPartitionRacks(2)
+        ));
+
+        // Initial subscriptions were [T1, T2]
+        Map<String, AssignmentMemberSpec> members = new HashMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 0)
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.singletonList(topic2Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1),
+            mkTopicAssignment(topic2Uuid, 1)
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.empty(),
+            Collections.singletonList(topic2Uuid),
+            currentAssignmentForB
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic2Uuid, 0)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic2Uuid, 1)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    @Test
+    public void 
testReassignmentWhenOneSubscriptionRemovedWithMemberAndPartitionRacks() {
+        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
+        topicMetadata.put(topic1Uuid, new TopicMetadata(
+            topic1Uuid,
+            topic1Name,
+            5,
+            mkMapOfPartitionRacks(3)
+        ));
+        topicMetadata.put(topic2Uuid, new TopicMetadata(
+            topic2Uuid,
+            topic2Name,
+            2,
+            mkMapOfPartitionRacks(3)
+        ));
+
+        // Initial subscriptions were [T1, T2].
+        Map<String, AssignmentMemberSpec> members = new TreeMap<>();
+
+        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 2, 4),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
+        );
+        members.put(memberA, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack0"),
+            Collections.singletonList(topic1Uuid),
+            currentAssignmentForA
+        ));
+
+        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1, 3),
+                mkTopicAssignment(topic2Uuid, 1)
+            )
+        );
+        members.put(memberB, new AssignmentMemberSpec(
+            Optional.empty(),
+            Optional.of("rack1"),
+            Collections.singletonList(topic1Uuid),
+            currentAssignmentForB
+        ));
+
+        AssignmentSpec assignmentSpec = new AssignmentSpec(members);
+        SubscribedTopicMetadata subscribedTopicMetadata = new 
SubscribedTopicMetadata(topicMetadata);
+
+        GroupAssignment computedAssignment = assignor.assign(assignmentSpec, 
subscribedTopicMetadata);
+
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(memberA, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 0, 2, 4)
+        ));
+        expectedAssignment.put(memberB, mkAssignment(
+            mkTopicAssignment(topic1Uuid, 1, 3)
+        ));
+
+        assertAssignment(expectedAssignment, computedAssignment);
+        checkValidityAndBalance(members, computedAssignment);
+    }
+
+    /**
+     * Verifies that the given assignment is valid with respect to the given 
subscriptions.
+     * Validity requirements:
+     *      - each member is subscribed to topics of all partitions assigned 
to it, and
+     *      - each partition is assigned to no more than one member.
+     * Balance requirements:
+     *      - the assignment is fully balanced (the numbers of topic 
partitions assigned to members differ by at most one), or
+     *      - there is no topic partition that can be moved from one member to 
another with 2+ fewer topic partitions.
+     *
+     * @param members                   Members data structure from the 
assignment Spec.
+     * @param computedGroupAssignment   Assignment computed by the uniform 
assignor.
+     */
+    private void checkValidityAndBalance(
+        Map<String, AssignmentMemberSpec> members,
+        GroupAssignment computedGroupAssignment
+    ) {
+        List<String> membersList = new 
ArrayList<>(computedGroupAssignment.members().keySet());
+        int numMembers = membersList.size();
+        List<Integer> totalAssignmentSizesOfAllMembers = new 
ArrayList<>(membersList.size());
+        membersList.forEach(member -> {
+            Map<Uuid, Set<Integer>> computedAssignmentForMember = 
computedGroupAssignment
+                .members().get(member).targetPartitions();
+            int sum = 
computedAssignmentForMember.values().stream().mapToInt(Set::size).sum();
+            totalAssignmentSizesOfAllMembers.add(sum);
+        });
+
+        for (int i = 0; i < numMembers; i++) {
+            String memberId = membersList.get(i);
+            Map<Uuid, Set<Integer>> computedAssignmentForMember =
+                
computedGroupAssignment.members().get(memberId).targetPartitions();
+            // Each member is subscribed to topics of all the partitions 
assigned to it.
+            computedAssignmentForMember.keySet().forEach(topicId -> {
+                // Check if the topic exists in the subscription.
+                
assertTrue(members.get(memberId).subscribedTopicIds().contains(topicId),
+                        "Error: Partitions for topic " + topicId + " are 
assigned to member " + memberId +
+                                " but it is not part of the members 
subscription ");
+            });
+
+            for (int j = i + 1; j < numMembers; j++) {
+                String otherMemberId = membersList.get(j);
+                Map<Uuid, Set<Integer>> computedAssignmentForOtherMember = 
computedGroupAssignment
+                    .members().get(otherMemberId).targetPartitions();
+                // Each partition should be assigned to at most one member
+                computedAssignmentForMember.keySet().forEach(topicId -> {
+                    Set<Integer> intersection = new HashSet<>();
+                    if (computedAssignmentForOtherMember.containsKey(topicId)) 
{
+                        intersection = new 
HashSet<>(computedAssignmentForMember.get(topicId));
+                        
intersection.retainAll(computedAssignmentForOtherMember.get(topicId));
+                    }
+                    assertTrue(
+                        intersection.isEmpty(),
+                        "Error : Member 1 " + memberId + " and Member 2 " + 
otherMemberId +
+                            "have common partitions assigned to them " + 
computedAssignmentForOtherMember.get(topicId)
+                    );
+                });
+
+                // Difference in the sizes of any two partitions should be 1 
at max
+                int size1 = totalAssignmentSizesOfAllMembers.get(i);
+                int size2 = totalAssignmentSizesOfAllMembers.get(j);
+                assertTrue(
+                    Math.abs(size1 - size2) <= 1,
+                    "Size of one assignment is greater than the other 
assignment by more than one partition "
+                        + size1 + " " + size2 + "abs = " + Math.abs(size1 - 
size2)
+                );
+            }
+        }
+    }
+
+    /**
+     * Verifies that the expected assignment is equal to the computed 
assignment for every member in the group.
+     */
+    private void assertAssignment(

Review Comment:
   nit: We have the same method in RangeAssignorTest. Could we somehow share it 
for the two suites? We could perhaps introduce an AssignorTestUtil class in 
this package and add it there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to