dajac commented on code in PR #14182: URL: https://github.com/apache/kafka/pull/14182#discussion_r1325561675
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * The assignment builder is used to construct the target assignment based on the members' subscriptions. + * + * This class contains common utility methods and a class for obtaining and storing rack information. + */ +public abstract class AbstractUniformAssignmentBuilder { + protected abstract GroupAssignment buildAssignment(); + + /** + * Determines if rack-aware assignment is appropriate based on the provided rack information. + * + * @param memberRacks Racks where members are located. + * @param partitionRacks Racks where partitions are located. + * @param racksPerPartition Map of partitions to their associated racks. + * + * @return {@code true} if rack-aware assignment should be applied; {@code false} otherwise. + */ + protected static boolean useRackAwareAssignment( + Set<String> memberRacks, + Set<String> partitionRacks, + Map<TopicIdPartition, Set<String>> racksPerPartition + ) { + if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, partitionRacks)) + return false; + else { + return !racksPerPartition.values().stream().allMatch(partitionRacks::equals); + } + } + + /** + * Adds the topic's partition to the member's target assignment. + */ + protected static void addPartitionToAssignment( + int partition, + Uuid topicId, + String memberId, + Map<String, MemberAssignment> targetAssignment Review Comment: small nit: In such case, I usually prefer to put the mutated object first in the list of arguments. Then, I would also put memberId, then topicId and finally partition to follow their hierarchy. I would also rename targetAssignment to assignment because the method is not tight to a target assignment in the end. It could be any assignment map. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AbstractUniformAssignmentBuilder.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * The assignment builder is used to construct the target assignment based on the members' subscriptions. + * + * This class contains common utility methods and a class for obtaining and storing rack information. + */ +public abstract class AbstractUniformAssignmentBuilder { + protected abstract GroupAssignment buildAssignment(); + + /** + * Determines if rack-aware assignment is appropriate based on the provided rack information. + * + * @param memberRacks Racks where members are located. + * @param partitionRacks Racks where partitions are located. + * @param racksPerPartition Map of partitions to their associated racks. + * + * @return {@code true} if rack-aware assignment should be applied; {@code false} otherwise. + */ + protected static boolean useRackAwareAssignment( + Set<String> memberRacks, + Set<String> partitionRacks, + Map<TopicIdPartition, Set<String>> racksPerPartition + ) { + if (memberRacks.isEmpty() || Collections.disjoint(memberRacks, partitionRacks)) + return false; + else { + return !racksPerPartition.values().stream().allMatch(partitionRacks::equals); + } + } + + /** + * Adds the topic's partition to the member's target assignment. + */ + protected static void addPartitionToAssignment( + int partition, + Uuid topicId, + String memberId, + Map<String, MemberAssignment> targetAssignment + ) { + targetAssignment.get(memberId) + .targetPartitions() + .computeIfAbsent(topicId, __ -> new HashSet<>()) + .add(partition); + } + + /** + * Constructs a list of {@code TopicIdPartition} for each topic Id based on its partition count. + * + * @param allTopicIds The subscribed topic Ids. + * @param subscribedTopicDescriber Utility to fetch the partition count for a given topic. + * + * @return List of sorted {@code TopicIdPartition} for all provided topic Ids. + */ + protected static List<TopicIdPartition> allTopicIdPartitions( + Collection<Uuid> allTopicIds, + SubscribedTopicDescriber subscribedTopicDescriber + ) { + List<TopicIdPartition> allTopicIdPartitions = new ArrayList<>(); + // Sorted so that partitions from each topic can be distributed amongst its subscribers equally. + allTopicIds.stream().sorted().forEach(topic -> + IntStream.range(0, subscribedTopicDescriber.numPartitions(topic)) + .forEach(i -> allTopicIdPartitions.add(new TopicIdPartition(topic, i)) + ) Review Comment: bit: This parenthesis seems misplaced, no? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * The Uniform Assignor distributes topic partitions among group members for a + * balanced and potentially rack aware assignment. + * The assignor employs two different strategies based on the nature of topic + * subscriptions across the group members: + * <ul> + * <li> + * <b> Optimized Uniform Assignment Builder: </b> This strategy is used when all members have subscribed + * to the same set of topics. + * </li> + * <li> + * <b> General Uniform Assignment Builder: </b> This strategy is used when members have varied topic + * subscriptions. + * </li> + * </ul> + * + * The appropriate strategy is automatically chosen based on the current members' topic subscriptions. + * + * @see OptimizedUniformAssignmentBuilder + * @see GeneralUniformAssignmentBuilder + */ +public class UniformAssignor implements PartitionAssignor { + private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class); + public static final String UNIFORM_ASSIGNOR_NAME = "uniform"; + + @Override + public String name() { + return UNIFORM_ASSIGNOR_NAME; + } + + /** + * Perform the group assignment given the current members and + * topics metadata. + * + * @param assignmentSpec The assignment specification that included member metadata. + * @param subscribedTopicDescriber The topic and cluster metadata describer {@link SubscribedTopicDescriber}. + * @return The new target assignment for the group. + */ + @Override + public GroupAssignment assign( + AssignmentSpec assignmentSpec, + SubscribedTopicDescriber subscribedTopicDescriber + ) throws PartitionAssignorException { + AbstractUniformAssignmentBuilder assignmentBuilder; + + if (assignmentSpec.members().isEmpty()) + return new GroupAssignment(Collections.emptyMap()); + + if (allSubscriptionsEqual(assignmentSpec.members())) { + LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " + + "optimized assignment algorithm"); + assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber); + } else { + assignmentBuilder = new GeneralUniformAssignmentBuilder(); + LOG.debug("Detected that all members are subscribed to a different set of topics, invoking the " Review Comment: nit: Let's log before creating the builder to be consistent with the other branch. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ########## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * all its members subscribed to the same set of topics. + * It is optimized since the assignment can be done in fewer, less complicated steps compared to when + * the subscriptions are different across the members. + * + * Assignments are done according to the following principles: + * + * + * <li> Balance: Ensure partitions are distributed equally among all members. + * The difference in assignments sizes between any two members + * should not exceed one partition. </li> + * <li> Rack Matching: When feasible, aim to assign partitions to members + * located on the same rack thus avoiding cross-zone traffic. </li> + * <li> Stickiness: Minimize partition movements among members by retaining + * as much of the existing assignment as possible. </li> + * + * The assignment builder prioritizes the properties in the following order: + * Balance > Rack Matching > Stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { + private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); + /** + * The assignment specification which includes member metadata. + */ + private final AssignmentSpec assignmentSpec; + /** + * The topic and partition metadata describer. + */ + private final SubscribedTopicDescriber subscribedTopicDescriber; + /** + * The set of topic Ids that the consumer group is subscribed to. + */ + private final Set<Uuid> subscriptionIds; + /** + * Rack information and helper methods. + */ + private final RackInfo rackInfo; + /** + * The number of members to receive an extra partition beyond the minimum quota. + * Minimum Quota = Total Partitions / Total Members + * Example: If there are 11 partitions to be distributed among 3 members, + * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. + */ + private int remainingMembersToGetAnExtraPartition; + /** + * Members mapped to the remaining number of partitions needed to meet the minimum quota, + * including members that are eligible to receive an extra partition. + */ + private final Map<String, Integer> potentiallyUnfilledMembers; + /** + * Members mapped to the remaining number of partitions needed to meet the full quota. + * Full quota = minQuota + one extra partition (if applicable). + */ + private Map<String, Integer> unfilledMembers; + /** + * The partitions that still need to be assigned. + * Initially this contains all the subscribed topics' partitions. + */ + private List<TopicIdPartition> unassignedPartitions; + /** + * The target assignment. + */ + private final Map<String, MemberAssignment> targetAssignment; + /** + * Tracks the existing owner of each partition. + * Only populated when the rack awareness strategy is used. + */ + private final Map<TopicIdPartition, String> currentPartitionOwners; + + OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + this.assignmentSpec = assignmentSpec; + this.subscribedTopicDescriber = subscribedTopicDescriber; + this.subscriptionIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); + this.potentiallyUnfilledMembers = new HashMap<>(); + this.unfilledMembers = new HashMap<>(); + this.targetAssignment = new HashMap<>(); + // Without rack-aware strategy, tracking current owners of unassigned partitions is unnecessary + // as all sticky partitions are retained until a member meets its quota. + this.currentPartitionOwners = rackInfo.useRackStrategy ? new HashMap<>() : Collections.emptyMap(); + } + + /** + * Here's the step-by-step breakdown of the assignment process: + * + * <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li> + * <li> Initialize unassigned partitions to all the topic partitions and + * remove partitions from the list as and when they are assigned.</li> + * <li> For existing assignments, retain partitions based on the determined quota and member's rack compatibility.</li> + * <li> If a partition's rack mismatches with its owner, track it for future use.</li> + * <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li> + * <li> Depending on members needing extra partitions, select members from the potentially unfilled list + * and add them to the unfilled list.</li> + * <li> Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list.</li> + * <li> If no rack-compatible member is found, revert to the tracked current owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment.</li> + */ + @Override + protected GroupAssignment buildAssignment() throws PartitionAssignorException { + int totalPartitionsCount = 0; + + for (Uuid topicId : subscriptionIds) { + int partitionCount = subscribedTopicDescriber.numPartitions(topicId); + if (partitionCount == -1) { + throw new PartitionAssignorException( + "Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." + ); + } else { + totalPartitionsCount += partitionCount; + } + } + + if (subscriptionIds.isEmpty()) { + LOG.info("The subscription list is empty, returning an empty assignment"); + return new GroupAssignment(Collections.emptyMap()); + } + + // The minimum required quota that each member needs to meet for a balanced assignment. + // This is the same for all members. + final int numberOfMembers = assignmentSpec.members().size(); + final int minQuota = totalPartitionsCount / numberOfMembers; + remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; + + assignmentSpec.members().keySet().forEach(memberId -> + targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) + )); + + // sorted list of all partitions. + unassignedPartitions = allTopicIdPartitions(subscriptionIds, subscribedTopicDescriber); + + assignStickyPartitions(minQuota); + + unfilledMembers = computeUnfilledMembers(); + + if (!unassignedPartitionsCountEqualsRemainingAssignmentsCount()) { + throw new PartitionAssignorException("Number of available partitions is not equal to the total requirement"); + } + + if (rackInfo.useRackStrategy) rackAwareRoundRobinAssignment(); + unassignedPartitionsRoundRobinAssignment(); + + return new GroupAssignment(targetAssignment); + } + + /** + * Retains a set of partitions from the existing assignment and includes them in the target assignment. + * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. + * In addition, if rack awareness is enabled, it is ensured that a partition's rack matches the member's rack. + * + * <p>For each member: + * <ol> + * <li> Find the valid current assignment considering topic subscriptions, metadata and rack information.</li> + * <li> If the current assignments exist, retain partitions up to the minimum quota.</li> + * <li> If the current assignment size is greater than the minimum quota and + * there are members that could get an extra partition, assign the next partition as well.</li> + * <li> Finally, if the member's current assignment size is less than the minimum quota, + * add them to the potentially unfilled members map and track the number of remaining + * partitions required to meet the quota.</li> + * </ol> + * </p> + */ + private void assignStickyPartitions(int minQuota) { + assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> { + List<TopicIdPartition> validCurrentAssignment = validCurrentAssignment( + memberId, + assignmentMemberSpec.assignedPartitions() + ); + + int currentAssignmentSize = validCurrentAssignment.size(); + // Number of partitions required to meet the minimum quota. + int remaining = minQuota - currentAssignmentSize; + + if (currentAssignmentSize > 0) { + int retainedPartitionsCount = min(currentAssignmentSize, minQuota); + IntStream.range(0, retainedPartitionsCount).forEach(i -> { + TopicIdPartition topicIdPartition = validCurrentAssignment.get(i); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); + }); + + // The extra partition is located at the last index from the previous step. + if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) { + TopicIdPartition topicIdPartition = validCurrentAssignment.get(retainedPartitionsCount); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); + remainingMembersToGetAnExtraPartition--; + } + } + + if (remaining >= 0) { + potentiallyUnfilledMembers.put(memberId, remaining); + } + + }); + } + + /** + * Filters the current assignment of partitions for a given member. + * + * Any partition that still belongs to the member's subscribed topics list is considered valid. + * If rack aware strategy can be used: Only partitions with matching rack are valid and non-matching partitions are + * tracked with their current owner for future use. + * + * @param memberId The Id of the member whose assignment is being validated. + * @param currentAssignment The partitions currently assigned to the member. + * + * @return List of valid partitions after applying the filters. + */ + private List<TopicIdPartition> validCurrentAssignment( + String memberId, + Map<Uuid, Set<Integer>> currentAssignment + ) { + List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>(); + currentAssignment.forEach((topicId, partitions) -> { + if (subscriptionIds.contains(topicId)) { + partitions.forEach(partition -> { + TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); + if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) { + currentPartitionOwners.put(topicIdPartition, memberId); + } else { + validCurrentAssignmentList.add(topicIdPartition); + } + }); + } else { + LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + } + }); + + return validCurrentAssignmentList; + } + + /** + * Allocates the unassigned partitions to unfilled members present in the same rack in a round-robin fashion. + */ + private void rackAwareRoundRobinAssignment() { + Queue<String> roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); + + // Sort partitions in ascending order by number of potential members with matching racks. + // Partitions with no potential members in the same rack aren't included in this list. + List<TopicIdPartition> sortedPartitions = rackInfo.sortPartitionsByRackMembers(unassignedPartitions); + + sortedPartitions.forEach(partition -> { + boolean assigned = false; + for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) { + String memberId = roundRobinMembers.poll(); + Integer remainingPartitionCount = unfilledMembers.getOrDefault(memberId, 0); + + if (remainingPartitionCount > 0 && !rackInfo.racksMismatch(memberId, partition)) { + assignPartitionToMember(memberId, partition); + assigned = true; + unassignedPartitions.remove(partition); + } + + // Only re-add the member to the end of the queue if it's still available for assignment. + if (unfilledMembers.containsKey(memberId)) { + roundRobinMembers.add(memberId); + } + } + }); + } + + /** + * Allocates the unassigned partitions to unfilled members in a round-robin fashion. + * + * If the rack-aware strategy is enabled, partitions are attempted to be assigned back to their current owners first. + * This is because pure stickiness without rack matching is not considered initially. + * + * If a partition couldn't be assigned to its current owner due to the quotas OR + * if the rack-aware strategy is not enabled, the partitions are allocated to the unfilled members. + */ + private void unassignedPartitionsRoundRobinAssignment() { + Queue<String> roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); + + unassignedPartitions.forEach(partition -> { + boolean assigned = false; + + if (rackInfo.useRackStrategy && currentPartitionOwners.containsKey(partition)) { + String prevOwner = currentPartitionOwners.get(partition); + if (unfilledMembers.containsKey(prevOwner)) { + assignPartitionToMember(prevOwner, partition); + assigned = true; + if (!unfilledMembers.containsKey(prevOwner)) { + roundRobinMembers.remove(prevOwner); + } + } + } + + // Only re-add the member to the end of the queue if it's still available for assignment. Review Comment: Ah ah. You got me :). I actually raised this because in rackAwareRoundRobinAssignment, your put it right before the if. That also works because the comment is really about that if. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ########## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * all its members subscribed to the same set of topics. + * It is optimized since the assignment can be done in fewer, less complicated steps compared to when + * the subscriptions are different across the members. + * + * Assignments are done according to the following principles: + * + * + * <li> Balance: Ensure partitions are distributed equally among all members. + * The difference in assignments sizes between any two members + * should not exceed one partition. </li> + * <li> Rack Matching: When feasible, aim to assign partitions to members + * located on the same rack thus avoiding cross-zone traffic. </li> + * <li> Stickiness: Minimize partition movements among members by retaining + * as much of the existing assignment as possible. </li> + * + * The assignment builder prioritizes the properties in the following order: + * Balance > Rack Matching > Stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { + private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); + /** + * The assignment specification which includes member metadata. + */ + private final AssignmentSpec assignmentSpec; + /** + * The topic and partition metadata describer. + */ + private final SubscribedTopicDescriber subscribedTopicDescriber; + /** + * The set of topic Ids that the consumer group is subscribed to. + */ + private final Set<Uuid> subscriptionIds; + /** + * Rack information and helper methods. + */ + private final RackInfo rackInfo; + /** + * The number of members to receive an extra partition beyond the minimum quota. + * Minimum Quota = Total Partitions / Total Members + * Example: If there are 11 partitions to be distributed among 3 members, + * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. + */ + private int remainingMembersToGetAnExtraPartition; + /** + * Members mapped to the remaining number of partitions needed to meet the minimum quota, + * including members that are eligible to receive an extra partition. + */ + private final Map<String, Integer> potentiallyUnfilledMembers; + /** + * Members mapped to the remaining number of partitions needed to meet the full quota. + * Full quota = minQuota + one extra partition (if applicable). + */ + private Map<String, Integer> unfilledMembers; + /** + * The partitions that still need to be assigned. + * Initially this contains all the subscribed topics' partitions. + */ + private List<TopicIdPartition> unassignedPartitions; + /** + * The target assignment. + */ + private final Map<String, MemberAssignment> targetAssignment; + /** + * Tracks the existing owner of each partition. + * Only populated when the rack awareness strategy is used. + */ + private final Map<TopicIdPartition, String> currentPartitionOwners; + + OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + this.assignmentSpec = assignmentSpec; + this.subscribedTopicDescriber = subscribedTopicDescriber; + this.subscriptionIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); + this.potentiallyUnfilledMembers = new HashMap<>(); + this.unfilledMembers = new HashMap<>(); + this.targetAssignment = new HashMap<>(); + // Without rack-aware strategy, tracking current owners of unassigned partitions is unnecessary + // as all sticky partitions are retained until a member meets its quota. + this.currentPartitionOwners = rackInfo.useRackStrategy ? new HashMap<>() : Collections.emptyMap(); + } + + /** + * Here's the step-by-step breakdown of the assignment process: + * + * <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li> + * <li> Initialize unassigned partitions to all the topic partitions and + * remove partitions from the list as and when they are assigned.</li> + * <li> For existing assignments, retain partitions based on the determined quota and member's rack compatibility.</li> + * <li> If a partition's rack mismatches with its owner, track it for future use.</li> + * <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li> + * <li> Depending on members needing extra partitions, select members from the potentially unfilled list + * and add them to the unfilled list.</li> + * <li> Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list.</li> + * <li> If no rack-compatible member is found, revert to the tracked current owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment.</li> + */ + @Override + protected GroupAssignment buildAssignment() throws PartitionAssignorException { + int totalPartitionsCount = 0; + + for (Uuid topicId : subscriptionIds) { + int partitionCount = subscribedTopicDescriber.numPartitions(topicId); + if (partitionCount == -1) { + throw new PartitionAssignorException( + "Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." + ); + } else { + totalPartitionsCount += partitionCount; + } + } + + if (subscriptionIds.isEmpty()) { + LOG.info("The subscription list is empty, returning an empty assignment"); + return new GroupAssignment(Collections.emptyMap()); + } + + // The minimum required quota that each member needs to meet for a balanced assignment. + // This is the same for all members. + final int numberOfMembers = assignmentSpec.members().size(); + final int minQuota = totalPartitionsCount / numberOfMembers; + remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; + + assignmentSpec.members().keySet().forEach(memberId -> + targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) + )); + + // sorted list of all partitions. + unassignedPartitions = allTopicIdPartitions(subscriptionIds, subscribedTopicDescriber); + + assignStickyPartitions(minQuota); + + unfilledMembers = computeUnfilledMembers(); + + if (!unassignedPartitionsCountEqualsRemainingAssignmentsCount()) { + throw new PartitionAssignorException("Number of available partitions is not equal to the total requirement"); + } + + if (rackInfo.useRackStrategy) rackAwareRoundRobinAssignment(); + unassignedPartitionsRoundRobinAssignment(); + + return new GroupAssignment(targetAssignment); + } + + /** + * Retains a set of partitions from the existing assignment and includes them in the target assignment. + * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. + * In addition, if rack awareness is enabled, it is ensured that a partition's rack matches the member's rack. + * + * <p>For each member: + * <ol> + * <li> Find the valid current assignment considering topic subscriptions, metadata and rack information.</li> + * <li> If the current assignments exist, retain partitions up to the minimum quota.</li> + * <li> If the current assignment size is greater than the minimum quota and + * there are members that could get an extra partition, assign the next partition as well.</li> + * <li> Finally, if the member's current assignment size is less than the minimum quota, + * add them to the potentially unfilled members map and track the number of remaining + * partitions required to meet the quota.</li> + * </ol> + * </p> + */ + private void assignStickyPartitions(int minQuota) { + assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> { + List<TopicIdPartition> validCurrentAssignment = validCurrentAssignment( + memberId, + assignmentMemberSpec.assignedPartitions() + ); + + int currentAssignmentSize = validCurrentAssignment.size(); + // Number of partitions required to meet the minimum quota. + int remaining = minQuota - currentAssignmentSize; + + if (currentAssignmentSize > 0) { + int retainedPartitionsCount = min(currentAssignmentSize, minQuota); + IntStream.range(0, retainedPartitionsCount).forEach(i -> { + TopicIdPartition topicIdPartition = validCurrentAssignment.get(i); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); + }); + + // The extra partition is located at the last index from the previous step. + if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) { + TopicIdPartition topicIdPartition = validCurrentAssignment.get(retainedPartitionsCount); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); + remainingMembersToGetAnExtraPartition--; + } + } + + if (remaining >= 0) { + potentiallyUnfilledMembers.put(memberId, remaining); + } + + }); + } + + /** + * Filters the current assignment of partitions for a given member. + * + * Any partition that still belongs to the member's subscribed topics list is considered valid. + * If rack aware strategy can be used: Only partitions with matching rack are valid and non-matching partitions are + * tracked with their current owner for future use. + * + * @param memberId The Id of the member whose assignment is being validated. + * @param currentAssignment The partitions currently assigned to the member. + * + * @return List of valid partitions after applying the filters. + */ + private List<TopicIdPartition> validCurrentAssignment( + String memberId, + Map<Uuid, Set<Integer>> currentAssignment + ) { + List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>(); + currentAssignment.forEach((topicId, partitions) -> { + if (subscriptionIds.contains(topicId)) { + partitions.forEach(partition -> { + TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); + if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) { + currentPartitionOwners.put(topicIdPartition, memberId); + } else { + validCurrentAssignmentList.add(topicIdPartition); + } + }); + } else { + LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + } + }); + + return validCurrentAssignmentList; + } + + /** + * Allocates the unassigned partitions to unfilled members present in the same rack in a round-robin fashion. + */ + private void rackAwareRoundRobinAssignment() { + Queue<String> roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); + + // Sort partitions in ascending order by number of potential members with matching racks. + // Partitions with no potential members in the same rack aren't included in this list. + List<TopicIdPartition> sortedPartitions = rackInfo.sortPartitionsByRackMembers(unassignedPartitions); + + sortedPartitions.forEach(partition -> { + boolean assigned = false; + for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) { + String memberId = roundRobinMembers.poll(); + Integer remainingPartitionCount = unfilledMembers.getOrDefault(memberId, 0); + + if (remainingPartitionCount > 0 && !rackInfo.racksMismatch(memberId, partition)) { + assignPartitionToMember(memberId, partition); + assigned = true; + unassignedPartitions.remove(partition); + } + + // Only re-add the member to the end of the queue if it's still available for assignment. + if (unfilledMembers.containsKey(memberId)) { + roundRobinMembers.add(memberId); + } + } + }); + } + + /** + * Allocates the unassigned partitions to unfilled members in a round-robin fashion. + * + * If the rack-aware strategy is enabled, partitions are attempted to be assigned back to their current owners first. + * This is because pure stickiness without rack matching is not considered initially. + * + * If a partition couldn't be assigned to its current owner due to the quotas OR + * if the rack-aware strategy is not enabled, the partitions are allocated to the unfilled members. + */ + private void unassignedPartitionsRoundRobinAssignment() { + Queue<String> roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); + + unassignedPartitions.forEach(partition -> { + boolean assigned = false; + + if (rackInfo.useRackStrategy && currentPartitionOwners.containsKey(partition)) { + String prevOwner = currentPartitionOwners.get(partition); + if (unfilledMembers.containsKey(prevOwner)) { + assignPartitionToMember(prevOwner, partition); + assigned = true; + if (!unfilledMembers.containsKey(prevOwner)) { + roundRobinMembers.remove(prevOwner); + } + } + } + + // Only re-add the member to the end of the queue if it's still available for assignment. + for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) { + String memberId = roundRobinMembers.poll(); + if (unfilledMembers.get(memberId) > 0) { + assignPartitionToMember(memberId, partition); + assigned = true; + } + if (unfilledMembers.containsKey(memberId)) { + roundRobinMembers.add(memberId); + } + } + }); + } + + /** + * Assigns the specified partition to the given member and updates the unfilled members map. + * + * <p> + * If the member has met their allocation quota, the member is removed from the unfilled members map. + * Otherwise, the count of remaining partitions that can be assigned to the member is updated. + * </p> + * + * @param memberId The Id of the member to which the partition will be assigned. + * @param topicIdPartition The topicIdPartition to be assigned. + */ + private void assignPartitionToMember(String memberId, TopicIdPartition topicIdPartition) { + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + + int remainingPartitionCount = unfilledMembers.get(memberId) - 1; + if (remainingPartitionCount == 0) { + unfilledMembers.remove(memberId); + } else { + unfilledMembers.put(memberId, remainingPartitionCount); + } + } + + /** + * Determines which members can still be assigned partitions to meet the full quota. + * + * The map contains: + * <ol> + * <li> Members that have met the minimum quota but will receive an extra partition. </li> + * <li> Members that have not yet met the minimum quota. If there are still members that could receive + * an extra partition, the remaining value is updated to include this. </li> + * </ol> + * + * @return A map of member Ids and the required partitions to meet the quota. + */ + private Map<String, Integer> computeUnfilledMembers() { + Map<String, Integer> unfilledMembers = new HashMap<>(); Review Comment: I see... It is a bit annoying to copy the map here but I can live with it if you think that it is better like this. I have another question regarding this code. My understanding is that we basically decide here which members will get the extra partitions. Basically, the first members while iterating over the map will get them. When the rack awareness is enabled, I wonder if we could get in a situation where the members with the extra partitions and the unassigned partitions are completely misaligned to due this. Let's take an example. We have a group with 10 members (from 1 to 10) subscribed to topic A. Topic A has 20 partitions so each member has 2 assigned partitions. All the partitions are available in a single rack. Members 1 to 5 are in rack R1 and members 6 to 10 in rack R2. Now let's say that we add 5 partitions to A. 3 are in R1 and 2 in R2. If we iterate from 1 to 10 to assign the extra partitions, it means that 1 to 5 will get an extra partitions even though two of the partitions are not in the same rack. Is this a possible scenario? It is perhaps a bit too extreme... ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ########## @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.common.TopicIdPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.stream.IntStream; + +import static java.lang.Math.min; + +/** + * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * all its members subscribed to the same set of topics. + * It is optimized since the assignment can be done in fewer, less complicated steps compared to when + * the subscriptions are different across the members. + * + * Assignments are done according to the following principles: + * + * + * <li> Balance: Ensure partitions are distributed equally among all members. + * The difference in assignments sizes between any two members + * should not exceed one partition. </li> + * <li> Rack Matching: When feasible, aim to assign partitions to members + * located on the same rack thus avoiding cross-zone traffic. </li> + * <li> Stickiness: Minimize partition movements among members by retaining + * as much of the existing assignment as possible. </li> + * + * The assignment builder prioritizes the properties in the following order: + * Balance > Rack Matching > Stickiness. + */ +public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { + private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); + /** + * The assignment specification which includes member metadata. + */ + private final AssignmentSpec assignmentSpec; + /** + * The topic and partition metadata describer. + */ + private final SubscribedTopicDescriber subscribedTopicDescriber; + /** + * The set of topic Ids that the consumer group is subscribed to. + */ + private final Set<Uuid> subscriptionIds; + /** + * Rack information and helper methods. + */ + private final RackInfo rackInfo; + /** + * The number of members to receive an extra partition beyond the minimum quota. + * Minimum Quota = Total Partitions / Total Members + * Example: If there are 11 partitions to be distributed among 3 members, + * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. + */ + private int remainingMembersToGetAnExtraPartition; + /** + * Members mapped to the remaining number of partitions needed to meet the minimum quota, + * including members that are eligible to receive an extra partition. + */ + private final Map<String, Integer> potentiallyUnfilledMembers; + /** + * Members mapped to the remaining number of partitions needed to meet the full quota. + * Full quota = minQuota + one extra partition (if applicable). + */ + private Map<String, Integer> unfilledMembers; + /** + * The partitions that still need to be assigned. + * Initially this contains all the subscribed topics' partitions. + */ + private List<TopicIdPartition> unassignedPartitions; + /** + * The target assignment. + */ + private final Map<String, MemberAssignment> targetAssignment; + /** + * Tracks the existing owner of each partition. + * Only populated when the rack awareness strategy is used. + */ + private final Map<TopicIdPartition, String> currentPartitionOwners; + + OptimizedUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + this.assignmentSpec = assignmentSpec; + this.subscribedTopicDescriber = subscribedTopicDescriber; + this.subscriptionIds = new HashSet<>(assignmentSpec.members().values().iterator().next().subscribedTopicIds()); + this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); + this.potentiallyUnfilledMembers = new HashMap<>(); + this.unfilledMembers = new HashMap<>(); + this.targetAssignment = new HashMap<>(); + // Without rack-aware strategy, tracking current owners of unassigned partitions is unnecessary + // as all sticky partitions are retained until a member meets its quota. + this.currentPartitionOwners = rackInfo.useRackStrategy ? new HashMap<>() : Collections.emptyMap(); + } + + /** + * Here's the step-by-step breakdown of the assignment process: + * + * <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li> + * <li> Initialize unassigned partitions to all the topic partitions and + * remove partitions from the list as and when they are assigned.</li> + * <li> For existing assignments, retain partitions based on the determined quota and member's rack compatibility.</li> + * <li> If a partition's rack mismatches with its owner, track it for future use.</li> + * <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li> + * <li> Depending on members needing extra partitions, select members from the potentially unfilled list + * and add them to the unfilled list.</li> + * <li> Proceed with a round-robin assignment adhering to rack awareness. + * For each unassigned partition, locate the first compatible member from the unfilled list.</li> + * <li> If no rack-compatible member is found, revert to the tracked current owner. + * If that member can't accommodate the partition due to quota limits, resort to a generic round-robin assignment.</li> + */ + @Override + protected GroupAssignment buildAssignment() throws PartitionAssignorException { + int totalPartitionsCount = 0; + + for (Uuid topicId : subscriptionIds) { + int partitionCount = subscribedTopicDescriber.numPartitions(topicId); + if (partitionCount == -1) { + throw new PartitionAssignorException( + "Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." + ); + } else { + totalPartitionsCount += partitionCount; + } + } + + if (subscriptionIds.isEmpty()) { + LOG.info("The subscription list is empty, returning an empty assignment"); + return new GroupAssignment(Collections.emptyMap()); + } + + // The minimum required quota that each member needs to meet for a balanced assignment. + // This is the same for all members. + final int numberOfMembers = assignmentSpec.members().size(); + final int minQuota = totalPartitionsCount / numberOfMembers; + remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; + + assignmentSpec.members().keySet().forEach(memberId -> + targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) + )); + + // sorted list of all partitions. + unassignedPartitions = allTopicIdPartitions(subscriptionIds, subscribedTopicDescriber); + + assignStickyPartitions(minQuota); + + unfilledMembers = computeUnfilledMembers(); + + if (!unassignedPartitionsCountEqualsRemainingAssignmentsCount()) { + throw new PartitionAssignorException("Number of available partitions is not equal to the total requirement"); + } + + if (rackInfo.useRackStrategy) rackAwareRoundRobinAssignment(); + unassignedPartitionsRoundRobinAssignment(); + + return new GroupAssignment(targetAssignment); + } + + /** + * Retains a set of partitions from the existing assignment and includes them in the target assignment. + * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. + * In addition, if rack awareness is enabled, it is ensured that a partition's rack matches the member's rack. + * + * <p>For each member: + * <ol> + * <li> Find the valid current assignment considering topic subscriptions, metadata and rack information.</li> + * <li> If the current assignments exist, retain partitions up to the minimum quota.</li> + * <li> If the current assignment size is greater than the minimum quota and + * there are members that could get an extra partition, assign the next partition as well.</li> + * <li> Finally, if the member's current assignment size is less than the minimum quota, + * add them to the potentially unfilled members map and track the number of remaining + * partitions required to meet the quota.</li> + * </ol> + * </p> + */ + private void assignStickyPartitions(int minQuota) { + assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> { + List<TopicIdPartition> validCurrentAssignment = validCurrentAssignment( + memberId, + assignmentMemberSpec.assignedPartitions() + ); + + int currentAssignmentSize = validCurrentAssignment.size(); + // Number of partitions required to meet the minimum quota. + int remaining = minQuota - currentAssignmentSize; + + if (currentAssignmentSize > 0) { + int retainedPartitionsCount = min(currentAssignmentSize, minQuota); + IntStream.range(0, retainedPartitionsCount).forEach(i -> { + TopicIdPartition topicIdPartition = validCurrentAssignment.get(i); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); + }); + + // The extra partition is located at the last index from the previous step. + if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) { + TopicIdPartition topicIdPartition = validCurrentAssignment.get(retainedPartitionsCount); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); + remainingMembersToGetAnExtraPartition--; + } + } + + if (remaining >= 0) { + potentiallyUnfilledMembers.put(memberId, remaining); + } + + }); + } + + /** + * Filters the current assignment of partitions for a given member. + * + * Any partition that still belongs to the member's subscribed topics list is considered valid. + * If rack aware strategy can be used: Only partitions with matching rack are valid and non-matching partitions are + * tracked with their current owner for future use. + * + * @param memberId The Id of the member whose assignment is being validated. + * @param currentAssignment The partitions currently assigned to the member. + * + * @return List of valid partitions after applying the filters. + */ + private List<TopicIdPartition> validCurrentAssignment( + String memberId, + Map<Uuid, Set<Integer>> currentAssignment + ) { + List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>(); + currentAssignment.forEach((topicId, partitions) -> { + if (subscriptionIds.contains(topicId)) { + partitions.forEach(partition -> { + TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); + if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) { + currentPartitionOwners.put(topicIdPartition, memberId); + } else { + validCurrentAssignmentList.add(topicIdPartition); + } + }); + } else { + LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + } + }); + + return validCurrentAssignmentList; + } + + /** + * Allocates the unassigned partitions to unfilled members present in the same rack in a round-robin fashion. + */ + private void rackAwareRoundRobinAssignment() { + Queue<String> roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); + + // Sort partitions in ascending order by number of potential members with matching racks. + // Partitions with no potential members in the same rack aren't included in this list. + List<TopicIdPartition> sortedPartitions = rackInfo.sortPartitionsByRackMembers(unassignedPartitions); + + sortedPartitions.forEach(partition -> { + boolean assigned = false; + for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) { + String memberId = roundRobinMembers.poll(); + Integer remainingPartitionCount = unfilledMembers.getOrDefault(memberId, 0); + + if (remainingPartitionCount > 0 && !rackInfo.racksMismatch(memberId, partition)) { + assignPartitionToMember(memberId, partition); + assigned = true; + unassignedPartitions.remove(partition); + } + + // Only re-add the member to the end of the queue if it's still available for assignment. + if (unfilledMembers.containsKey(memberId)) { + roundRobinMembers.add(memberId); + } + } + }); + } + + /** + * Allocates the unassigned partitions to unfilled members in a round-robin fashion. + * + * If the rack-aware strategy is enabled, partitions are attempted to be assigned back to their current owners first. + * This is because pure stickiness without rack matching is not considered initially. + * + * If a partition couldn't be assigned to its current owner due to the quotas OR + * if the rack-aware strategy is not enabled, the partitions are allocated to the unfilled members. + */ + private void unassignedPartitionsRoundRobinAssignment() { + Queue<String> roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); Review Comment: How about creating the queue just before calling rackAwareRoundRobinAssignment and unassignedPartitionsRoundRobinAssignment and passing it as a parameter? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ########## @@ -0,0 +1,1090 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class OptimizedUniformAssignmentBuilderTest { + private final UniformAssignor assignor = new UniformAssignor(); + private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw"); + private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe"); + private final Uuid topic3Uuid = Uuid.fromString("T3-CU8fVTLCz5YMkLoDQsa"); + private final String topic1Name = "topic1"; + private final String topic2Name = "topic2"; + private final String topic3Name = "topic3"; + private final String memberA = "A"; + private final String memberB = "B"; + private final String memberC = "C"; + + @Test + public void testOneMemberNoTopicSubscription() { + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + ) + ) + ); + + Map<String, AssignmentMemberSpec> members = Collections.singletonMap( + memberA, + new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.emptyList(), + Collections.emptyMap() + ) + ); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + GroupAssignment groupAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + assertEquals(Collections.emptyMap(), groupAssignment.members()); + } + + @Test + public void testOneMemberSubscribedToNonexistentTopic() { + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata( + Collections.singletonMap( + topic1Uuid, + new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + ) + ) + ); + + Map<String, AssignmentMemberSpec> members = Collections.singletonMap( + memberA, + new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.singletonList(topic2Uuid), + Collections.emptyMap() + ) + ); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + + assertThrows(PartitionAssignorException.class, + () -> assignor.assign(assignmentSpec, subscribedTopicMetadata)); + } + + @Test + public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + mkMapOfPartitionRacks(2) + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic3Uuid), + Collections.emptyMap() + )); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic3Uuid), + Collections.emptyMap() + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2), + mkTopicAssignment(topic3Uuid, 1) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic3Uuid, 0) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testFirstAssignmentTwoMembersTwoTopicsNoPartitionRacks() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + Collections.emptyMap() + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + Collections.emptyMap() + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack1"), + Arrays.asList(topic1Uuid, topic3Uuid), + Collections.emptyMap() + )); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack2"), + Arrays.asList(topic1Uuid, topic3Uuid), + Collections.emptyMap() + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2), + mkTopicAssignment(topic3Uuid, 1) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic3Uuid, 0) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testFirstAssignmentThreeMembersThreeTopicsWithMemberAndPartitionRacks() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + mkMapOfPartitionRacks(2) + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack1"), + Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + Collections.emptyMap() + )); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack2"), + Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + Collections.emptyMap() + )); + members.put(memberC, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack3"), + Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + Collections.emptyMap() + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0), + mkTopicAssignment(topic3Uuid, 0) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1), + mkTopicAssignment(topic3Uuid, 1) + )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(topic1Uuid, 2), + mkTopicAssignment(topic2Uuid, 2) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testFirstAssignmentThreeMembersThreeTopicsWithSomeMemberAndPartitionRacks() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + Collections.emptyMap() + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + Collections.emptyMap() + )); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack2"), + Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + Collections.emptyMap() + )); + members.put(memberC, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + Collections.emptyMap() + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 2), + mkTopicAssignment(topic3Uuid, 1) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 2), + mkTopicAssignment(topic2Uuid, 1) + )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(topic2Uuid, 0), + mkTopicAssignment(topic3Uuid, 0) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic3Uuid, new TopicMetadata( + topic3Uuid, + topic3Name, + 2, + mkMapOfPartitionRacks(2) + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.singletonList(topic3Uuid), + Collections.emptyMap() + )); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.singletonList(topic3Uuid), + Collections.emptyMap() + )); + members.put(memberC, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.singletonList(topic3Uuid), + Collections.emptyMap() + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + // Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment. + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic3Uuid, 0) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic3Uuid, 1) + )); + expectedAssignment.put(memberC, + Collections.emptyMap() + ); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testValidityAndBalanceForLargeSampleSet() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + for (int i = 1; i < 100; i++) { + Uuid topicId = Uuid.randomUuid(); + topicMetadata.put(topicId, new TopicMetadata( + topicId, + "topic-" + i, + 3, + mkMapOfPartitionRacks(3) + )); + } + + List<Uuid> subscribedTopics = new ArrayList<>(topicMetadata.keySet()); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + for (int i = 1; i < 50; i++) { + members.put("member" + i, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + subscribedTopics, + Collections.emptyMap() + )); + } + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + mkMapOfPartitionRacks(3) + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 1), + mkTopicAssignment(topic2Uuid, 0, 1) + ) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 2), + mkTopicAssignment(topic2Uuid, 2) + ) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForB + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 1), + mkTopicAssignment(topic2Uuid, 0) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 2), + mkTopicAssignment(topic2Uuid, 1, 2) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentOnRackChangesWithMemberAndPartitionRacks() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + mkMapOfPartitionRacks(3) + )); + + // Initially A was in rack 1 and B was in rack 2, now let's switch them. + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 1), + mkTopicAssignment(topic2Uuid, 0) + ) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack2"), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 2), + mkTopicAssignment(topic2Uuid, 1, 2) + ) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack1"), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForB + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 2), + mkTopicAssignment(topic2Uuid, 2) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 1, 0) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentOnAddingPartitionsWithMemberAndPartitionRacks() { + // Initially T1,T2 had 3 partitions. + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 5, + mkMapOfPartitionRacks(5) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 5, + mkMapOfPartitionRacks(5) + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0, 2) + ) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack0"), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 2), + mkTopicAssignment(topic2Uuid, 1) + ) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack1"), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForB + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 3), + mkTopicAssignment(topic2Uuid, 0, 3, 4) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 2, 4), + mkTopicAssignment(topic2Uuid, 1, 2) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { + // Simulating adding partition to T1 and T2 - originally T1 -> 3 Partitions and T2 -> 3 Partitions + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 6, + mkMapOfPartitionRacks(6) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 5, + mkMapOfPartitionRacks(5) + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2), + mkTopicAssignment(topic2Uuid, 0) + ) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1, 2) + ) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForB + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2, 3, 5), + mkTopicAssignment(topic2Uuid, 0, 4) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 4), + mkTopicAssignment(topic2Uuid, 1, 2, 3) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembersTwoTopics() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + mkMapOfPartitionRacks(3) + )); + + Map<String, AssignmentMemberSpec> members = new HashMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2), + mkTopicAssignment(topic2Uuid, 0) + )); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1, 2) + )); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForB + )); + + // Add a new member to trigger a re-assignment. + members.put(memberC, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + Collections.emptyMap() + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1) + )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(topic2Uuid, 0, 2) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentOnAddingMemberWithRackAndPartitionRacks() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + mkMapOfPartitionRacks(3) + )); + + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0, 2) + ) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack0"), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 2), + mkTopicAssignment(topic2Uuid, 1) + ) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack1"), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForB + )); + + // New member added. + members.put(memberC, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack2"), + Arrays.asList(topic1Uuid, topic2Uuid), + Collections.emptyMap() + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1) + )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(topic1Uuid, 2), + mkTopicAssignment(topic2Uuid, 2) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeMembersTwoTopics() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 3, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 3, + mkMapOfPartitionRacks(3) + )); + + Map<String, AssignmentMemberSpec> members = new HashMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Arrays.asList(topic1Uuid, topic2Uuid), + currentAssignmentForB + )); + + // Member C was removed + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2), + mkTopicAssignment(topic2Uuid, 0) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1, 2) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWithTwoMembersTwoTopics() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 2, + mkMapOfPartitionRacks(2) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 2, + mkMapOfPartitionRacks(2) + )); + + // Initial subscriptions were [T1, T2] + Map<String, AssignmentMemberSpec> members = new HashMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.singletonList(topic2Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.singletonList(topic2Uuid), + currentAssignmentForB + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic2Uuid, 0) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic2Uuid, 1) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + @Test + public void testReassignmentWhenOneSubscriptionRemovedWithMemberAndPartitionRacks() { + Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 5, + mkMapOfPartitionRacks(3) + )); + topicMetadata.put(topic2Uuid, new TopicMetadata( + topic2Uuid, + topic2Name, + 2, + mkMapOfPartitionRacks(3) + )); + + // Initial subscriptions were [T1, T2]. + Map<String, AssignmentMemberSpec> members = new TreeMap<>(); + + Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2, 4), + mkTopicAssignment(topic2Uuid, 0) + ) + ); + members.put(memberA, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack0"), + Collections.singletonList(topic1Uuid), + currentAssignmentForA + )); + + Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( + mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 3), + mkTopicAssignment(topic2Uuid, 1) + ) + ); + members.put(memberB, new AssignmentMemberSpec( + Optional.empty(), + Optional.of("rack1"), + Collections.singletonList(topic1Uuid), + currentAssignmentForB + )); + + AssignmentSpec assignmentSpec = new AssignmentSpec(members); + SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign(assignmentSpec, subscribedTopicMetadata); + + Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 2, 4) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1, 3) + )); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + + /** + * Verifies that the given assignment is valid with respect to the given subscriptions. + * Validity requirements: + * - each member is subscribed to topics of all partitions assigned to it, and + * - each partition is assigned to no more than one member. + * Balance requirements: + * - the assignment is fully balanced (the numbers of topic partitions assigned to members differ by at most one), or + * - there is no topic partition that can be moved from one member to another with 2+ fewer topic partitions. + * + * @param members Members data structure from the assignment Spec. + * @param computedGroupAssignment Assignment computed by the uniform assignor. + */ + private void checkValidityAndBalance( + Map<String, AssignmentMemberSpec> members, + GroupAssignment computedGroupAssignment + ) { + List<String> membersList = new ArrayList<>(computedGroupAssignment.members().keySet()); + int numMembers = membersList.size(); + List<Integer> totalAssignmentSizesOfAllMembers = new ArrayList<>(membersList.size()); + membersList.forEach(member -> { + Map<Uuid, Set<Integer>> computedAssignmentForMember = computedGroupAssignment + .members().get(member).targetPartitions(); + int sum = computedAssignmentForMember.values().stream().mapToInt(Set::size).sum(); + totalAssignmentSizesOfAllMembers.add(sum); + }); + + for (int i = 0; i < numMembers; i++) { + String memberId = membersList.get(i); + Map<Uuid, Set<Integer>> computedAssignmentForMember = + computedGroupAssignment.members().get(memberId).targetPartitions(); + // Each member is subscribed to topics of all the partitions assigned to it. + computedAssignmentForMember.keySet().forEach(topicId -> { + // Check if the topic exists in the subscription. + assertTrue(members.get(memberId).subscribedTopicIds().contains(topicId), + "Error: Partitions for topic " + topicId + " are assigned to member " + memberId + + " but it is not part of the members subscription "); + }); + + for (int j = i + 1; j < numMembers; j++) { + String otherMemberId = membersList.get(j); + Map<Uuid, Set<Integer>> computedAssignmentForOtherMember = computedGroupAssignment + .members().get(otherMemberId).targetPartitions(); + // Each partition should be assigned to at most one member + computedAssignmentForMember.keySet().forEach(topicId -> { + Set<Integer> intersection = new HashSet<>(); + if (computedAssignmentForOtherMember.containsKey(topicId)) { + intersection = new HashSet<>(computedAssignmentForMember.get(topicId)); + intersection.retainAll(computedAssignmentForOtherMember.get(topicId)); + } + assertTrue( + intersection.isEmpty(), + "Error : Member 1 " + memberId + " and Member 2 " + otherMemberId + + "have common partitions assigned to them " + computedAssignmentForOtherMember.get(topicId) + ); + }); + + // Difference in the sizes of any two partitions should be 1 at max + int size1 = totalAssignmentSizesOfAllMembers.get(i); + int size2 = totalAssignmentSizesOfAllMembers.get(j); + assertTrue( + Math.abs(size1 - size2) <= 1, + "Size of one assignment is greater than the other assignment by more than one partition " + + size1 + " " + size2 + "abs = " + Math.abs(size1 - size2) + ); + } + } + } + + /** + * Verifies that the expected assignment is equal to the computed assignment for every member in the group. + */ + private void assertAssignment( Review Comment: nit: We have the same method in RangeAssignorTest. Could we somehow share it for the two suites? We could perhaps introduce an AssignorTestUtil class in this package and add it there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org