dajac commented on code in PR #13998: URL: https://github.com/apache/kafka/pull/13998#discussion_r1269199183
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ########## @@ -52,33 +42,22 @@ public Map<String, AssignmentMemberSpec> members() { return members; } - /** - * @return Topic metadata keyed by topic Ids. - */ - public Map<Uuid, AssignmentTopicMetadata> topics() { - return topics; - } - @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof AssignmentSpec)) return false; Review Comment: nit: This line change is not necessary. Let's revert it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { Review Comment: nit: I wonder if `SubscribedTopicDescriber` would be better based on the javadoc. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { + + /** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ + Set<Uuid> subscribedTopicIds(); + + /** + * Number of partitions for the given topicId. Review Comment: nit: `topicIds` -> `topic id`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { + + /** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ + Set<Uuid> subscribedTopicIds(); + + /** + * Number of partitions for the given topicId. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topicId. + * If the topicId doesn't exist return 0; + */ + int numPartitions(Uuid topicId); + + /** + * Returns all the racks associated with the replicas for the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition number within topic. Review Comment: nit: Let's use partition id or index instead of number. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ########## @@ -52,33 +42,22 @@ public Map<String, AssignmentMemberSpec> members() { return members; } - /** - * @return Topic metadata keyed by topic Ids. - */ - public Map<Uuid, AssignmentTopicMetadata> topics() { - return topics; - } - @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof AssignmentSpec)) return false; AssignmentSpec that = (AssignmentSpec) o; - if (!members.equals(that.members)) return false; - return topics.equals(that.topics); + return members.equals(that.members); } @Override public int hashCode() { - int result = members.hashCode(); - result = 31 * result + topics.hashCode(); - return result; + return Objects.hash(members); Review Comment: nit: Should we just use `members.hashCode()`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ########## @@ -52,33 +42,22 @@ public Map<String, AssignmentMemberSpec> members() { return members; } - /** - * @return Topic metadata keyed by topic Ids. - */ - public Map<Uuid, AssignmentTopicMetadata> topics() { - return topics; - } - @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof AssignmentSpec)) return false; AssignmentSpec that = (AssignmentSpec) o; - if (!members.equals(that.members)) return false; - return topics.equals(that.topics); + return members.equals(that.members); } @Override public int hashCode() { - int result = members.hashCode(); - result = 31 * result + topics.hashCode(); - return result; + return Objects.hash(members); } @Override public String toString() { return "AssignmentSpec(members=" + members + - ", topics=" + topics + ')'; Review Comment: nit: Let's bring this one back on the previous line. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java: ########## @@ -16,44 +16,83 @@ */ package org.apache.kafka.coordinator.group.assignor; -/** - * Metadata of a topic. - */ -public class AssignmentTopicMetadata { +import org.apache.kafka.common.Uuid; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class AssignmentTopicMetadata implements AssignmentTopicDescriber { Review Comment: It may be better to move this class to the same package as the TargetAssignmentBuilder as this is not really part of interface anymore. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java: ########## @@ -16,44 +16,83 @@ */ package org.apache.kafka.coordinator.group.assignor; -/** - * Metadata of a topic. - */ -public class AssignmentTopicMetadata { +import org.apache.kafka.common.Uuid; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class AssignmentTopicMetadata implements AssignmentTopicDescriber { + + Map<Uuid, PartitionMetadata> TopicPartitionMetadata; + + public AssignmentTopicMetadata(Map<Uuid, PartitionMetadata> TopicPartitionMetadata) { + this.TopicPartitionMetadata = TopicPartitionMetadata; + } + + /** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ + @Override + public Set<Uuid> subscribedTopicIds() { + return TopicPartitionMetadata.keySet(); + } /** - * The number of partitions. + * Number of partitions for the given topicId. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topicId. + * If the topicId doesn't exist return 0; */ - private final int numPartitions; + @Override + public int numPartitions(Uuid topicId) { + PartitionMetadata partitionMetadata = TopicPartitionMetadata.get(topicId); + if (partitionMetadata == null) { + return 0; + } - public AssignmentTopicMetadata( - int numPartitions - ) { - this.numPartitions = numPartitions; + return partitionMetadata.numPartitions(); } /** - * @return The number of partitions present for the topic. + * Returns all the racks associated with the replicas for the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition number within topic. + * @return The set of racks corresponding to the replicas of the topics partition. + * If the topicId doesn't exist return an empty set; */ - public int numPartitions() { - return numPartitions; + @Override + public Set<String> racksForPartition(Uuid topicId, int partition) { + PartitionMetadata partitionMetadata = TopicPartitionMetadata.get(topicId); + if (partitionMetadata == null) { + return Collections.emptySet(); + } + + return partitionMetadata.racks(partition); } @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof AssignmentTopicMetadata)) return false; Review Comment: nit: Let's revert this change. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java: ########## @@ -36,8 +36,9 @@ public interface PartitionAssignor { * Perform the group assignment given the current members and * topic metadata. * - * @param assignmentSpec The assignment spec. + * @param assignmentTopicDescriber The topic and cluster metadata describer {@link AssignmentTopicDescriber}. + * @param assignmentSpec The member assignment spec. * @return The new assignment for the group. */ - GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException; + GroupAssignment assign(AssignmentTopicDescriber assignmentTopicDescriber, AssignmentSpec assignmentSpec) throws PartitionAssignorException; Review Comment: Would it make sense to put `AssignmentTopicDescriber` as the second argument? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java: ########## @@ -16,44 +16,83 @@ */ package org.apache.kafka.coordinator.group.assignor; -/** - * Metadata of a topic. - */ -public class AssignmentTopicMetadata { +import org.apache.kafka.common.Uuid; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class AssignmentTopicMetadata implements AssignmentTopicDescriber { + + Map<Uuid, PartitionMetadata> TopicPartitionMetadata; Review Comment: nit: Variable starts with a lower case letter. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java: ########## @@ -16,44 +16,83 @@ */ package org.apache.kafka.coordinator.group.assignor; -/** - * Metadata of a topic. - */ -public class AssignmentTopicMetadata { +import org.apache.kafka.common.Uuid; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class AssignmentTopicMetadata implements AssignmentTopicDescriber { + + Map<Uuid, PartitionMetadata> TopicPartitionMetadata; + + public AssignmentTopicMetadata(Map<Uuid, PartitionMetadata> TopicPartitionMetadata) { + this.TopicPartitionMetadata = TopicPartitionMetadata; + } + + /** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ + @Override + public Set<Uuid> subscribedTopicIds() { + return TopicPartitionMetadata.keySet(); + } /** - * The number of partitions. + * Number of partitions for the given topicId. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topicId. + * If the topicId doesn't exist return 0; */ - private final int numPartitions; + @Override + public int numPartitions(Uuid topicId) { + PartitionMetadata partitionMetadata = TopicPartitionMetadata.get(topicId); + if (partitionMetadata == null) { + return 0; + } - public AssignmentTopicMetadata( - int numPartitions - ) { - this.numPartitions = numPartitions; + return partitionMetadata.numPartitions(); } /** - * @return The number of partitions present for the topic. + * Returns all the racks associated with the replicas for the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition number within topic. + * @return The set of racks corresponding to the replicas of the topics partition. + * If the topicId doesn't exist return an empty set; */ - public int numPartitions() { - return numPartitions; + @Override + public Set<String> racksForPartition(Uuid topicId, int partition) { + PartitionMetadata partitionMetadata = TopicPartitionMetadata.get(topicId); + if (partitionMetadata == null) { + return Collections.emptySet(); + } + + return partitionMetadata.racks(partition); } @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof AssignmentTopicMetadata)) return false; AssignmentTopicMetadata that = (AssignmentTopicMetadata) o; - return numPartitions == that.numPartitions; + return TopicPartitionMetadata.equals(that.TopicPartitionMetadata); } @Override public int hashCode() { - return numPartitions; + return Objects.hash(TopicPartitionMetadata); } @Override public String toString() { - return "AssignmentTopicMetadata(numPartitions=" + numPartitions + ')'; + return "AssignmentTopicMetadata{" + Review Comment: nit: We should use `()` instead of `{}`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class PartitionMetadata { + + /** + * Partition number mapped to a set of racks where + * its replicas are located. + */ + Map<Integer, Set<String>> partitionsWithRacks; Review Comment: nit: Should this one be private? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class PartitionMetadata { + + /** + * Partition number mapped to a set of racks where + * its replicas are located. + */ + Map<Integer, Set<String>> partitionsWithRacks; + + //If rack information isn't available pass an empty set. Review Comment: nit: Let's use the javadoc format. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class PartitionMetadata { Review Comment: nit: javadoc for the class. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java: ########## @@ -16,44 +16,83 @@ */ package org.apache.kafka.coordinator.group.assignor; -/** - * Metadata of a topic. - */ -public class AssignmentTopicMetadata { +import org.apache.kafka.common.Uuid; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class AssignmentTopicMetadata implements AssignmentTopicDescriber { + + Map<Uuid, PartitionMetadata> TopicPartitionMetadata; + + public AssignmentTopicMetadata(Map<Uuid, PartitionMetadata> TopicPartitionMetadata) { + this.TopicPartitionMetadata = TopicPartitionMetadata; + } + + /** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ + @Override + public Set<Uuid> subscribedTopicIds() { + return TopicPartitionMetadata.keySet(); + } /** - * The number of partitions. + * Number of partitions for the given topicId. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topicId. + * If the topicId doesn't exist return 0; */ - private final int numPartitions; + @Override + public int numPartitions(Uuid topicId) { + PartitionMetadata partitionMetadata = TopicPartitionMetadata.get(topicId); + if (partitionMetadata == null) { + return 0; + } - public AssignmentTopicMetadata( - int numPartitions - ) { - this.numPartitions = numPartitions; + return partitionMetadata.numPartitions(); } /** - * @return The number of partitions present for the topic. + * Returns all the racks associated with the replicas for the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition number within topic. + * @return The set of racks corresponding to the replicas of the topics partition. + * If the topicId doesn't exist return an empty set; */ - public int numPartitions() { - return numPartitions; + @Override + public Set<String> racksForPartition(Uuid topicId, int partition) { + PartitionMetadata partitionMetadata = TopicPartitionMetadata.get(topicId); + if (partitionMetadata == null) { + return Collections.emptySet(); + } + + return partitionMetadata.racks(partition); } @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (!(o instanceof AssignmentTopicMetadata)) return false; AssignmentTopicMetadata that = (AssignmentTopicMetadata) o; - return numPartitions == that.numPartitions; + return TopicPartitionMetadata.equals(that.TopicPartitionMetadata); } @Override public int hashCode() { - return numPartitions; + return Objects.hash(TopicPartitionMetadata); Review Comment: nit: `topicPartitionMetadata.hashcode()`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/RackAwareTopicIdPartition.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.common; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +public class RackAwareTopicIdPartition { Review Comment: Is this used anywhere? If not, let's remove it from this PR. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ########## @@ -78,7 +78,7 @@ public MemberWithRemainingAssignments(String memberId, int remaining) { /** * @return Map of topic ids to a list of members subscribed to them. */ - private Map<Uuid, List<String>> membersPerTopic(final AssignmentSpec assignmentSpec) { + private Map<Uuid, List<String>> membersPerTopic(AssignmentTopicDescriber assignmentTopicDescriber, final AssignmentSpec assignmentSpec) { Review Comment: nit: Should `assignmentTopicDescriber` be final as well to be consistent? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class PartitionMetadata { + + /** + * Partition number mapped to a set of racks where + * its replicas are located. + */ + Map<Integer, Set<String>> partitionsWithRacks; + + //If rack information isn't available pass an empty set. + public PartitionMetadata (Map<Integer, Set<String>> partitionsWithRacks) { + Objects.requireNonNull(partitionsWithRacks); + this.partitionsWithRacks = partitionsWithRacks; + } + + /** + * Returns the number of partitions. + * + * @return number of partitions associated with the topic. + */ + public int numPartitions() { + return partitionsWithRacks.size(); + } + + /** + * Returns the rack information for the replicas of the given partition. + * + * @param partition partition number. + * @return Set of racks associated with the replicas of the given partition. + * If no rack information is available, an empty set is returned. + */ + public Set<String> racks(int partition) { + return partitionsWithRacks.get(partition); + } + + @Override + public boolean equals(Object o) { Review Comment: Same commit for equals, hashCode and toString than for the others. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ########## @@ -87,7 +87,7 @@ private Map<Uuid, List<String>> membersPerTopic(final AssignmentSpec assignmentS for (Uuid topicId : topics) { // Only topics that are present in both the subscribed topics list and the topic metadata should be // considered for assignment. - if (assignmentSpec.topics().containsKey(topicId)) { + if (assignmentTopicDescriber.subscribedTopicIds().contains(topicId)) { Review Comment: Is this the only where place where we use `subscribedTopicIds`? If so, we could also remove it and use `numPartition` to check if a topic exists. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -434,22 +437,36 @@ public void setSubscriptionMetadata( public Map<String, TopicMetadata> computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { // Copy and update the current subscriptions. Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); // Create the topic metadata for each subscribed topic. Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { + Map<Integer, Set<String>> partitionRackInfo = new HashMap<>(); Review Comment: nit: I wonder if `partitionRacks` would be a better name. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -434,22 +437,36 @@ public void setSubscriptionMetadata( public Map<String, TopicMetadata> computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage Review Comment: Would it make sense to directly pass `MetadataImage`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ########## @@ -244,16 +245,18 @@ public TargetAssignmentResult build() throws PartitionAssignorException { }); // Prepare the topic metadata. - Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>(); + Map<Uuid, PartitionMetadata> topicMetadataMap = new HashMap<>(); Review Comment: I was hoping that we could actually avoid this copy. Would it be possible? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -40,23 +44,31 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition to a set of its rackIds. + * If the rack information is unavailable, pass an empty set. + */ + private final Map<Integer, Set<String>> partitionRackInfo; Review Comment: I am wondering if we should introduce `PartitionMetadata` instead of using `Set<String>` directly. I am not sure if it is worth it though. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -106,16 +127,23 @@ public String toString() { "id=" + id + ", name=" + name + ", numPartitions=" + numPartitions + + ", partitionRackInfo=" + partitionRackInfo + ')'; } public static TopicMetadata fromRecord( ConsumerGroupPartitionMetadataValue.TopicMetadata record ) { + // Converting the data type from a list stored in the record to a map. + Map<Integer, Set<String>> partitionRackInfo = new HashMap<>(record.partitionRackInfo().size()); + for (ConsumerGroupPartitionMetadataValue.PartitionRackInfo info : record.partitionRackInfo()) { + partitionRackInfo.put(info.partition(), new HashSet<>(info.racks())); Review Comment: Let's make the set immutable here. You can use `Collections. unmodifiableSet` to do this. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -40,23 +44,31 @@ public class TopicMetadata { */ private final int numPartitions; Review Comment: Should we move this one as it can get it from `partitionRackInfo.size()`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -434,22 +437,36 @@ public void setSubscriptionMetadata( public Map<String, TopicMetadata> computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { // Copy and update the current subscriptions. Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); // Create the topic metadata for each subscribed topic. Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { + Map<Integer, Set<String>> partitionRackInfo = new HashMap<>(); + + topicImage.partitions().forEach((partition, partitionRegistration) -> { + Set<String> racks = new HashSet<>(); + for (int replica : Objects.requireNonNull(partitionRegistration.replicas)) { Review Comment: Can `replicas` be null in the image? `requireNonNull` will throw an exception if `replicas` is null. It would be better to handle it gracefully if it can really happen. ########## group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json: ########## @@ -29,7 +29,27 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." } + "about": "The number of partitions of the topic." }, + { Review Comment: Please, let's try to respect the existing format for the fields. ########## group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json: ########## @@ -29,7 +29,27 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." } + "about": "The number of partitions of the topic." }, + { + "name": "PartitionRackInfo", Review Comment: nit: `PartitionMetadata` to be consistent with `TopicMetadata`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class PartitionMetadata { + + /** + * Partition number mapped to a set of racks where + * its replicas are located. + */ + Map<Integer, Set<String>> partitionsWithRacks; + + //If rack information isn't available pass an empty set. + public PartitionMetadata (Map<Integer, Set<String>> partitionsWithRacks) { Review Comment: nit: Unnecessary empty space before the opening parenthesis. ########## group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json: ########## @@ -29,7 +29,27 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", Review Comment: Do we still need this field? I suppose that it depends on whether you plan to store partition metadata for partitions without racks or not. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -434,22 +437,36 @@ public void setSubscriptionMetadata( public Map<String, TopicMetadata> computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { // Copy and update the current subscriptions. Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); // Create the topic metadata for each subscribed topic. Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { + Map<Integer, Set<String>> partitionRackInfo = new HashMap<>(); + + topicImage.partitions().forEach((partition, partitionRegistration) -> { + Set<String> racks = new HashSet<>(); + for (int replica : Objects.requireNonNull(partitionRegistration.replicas)) { + Optional<String> rackOptional = clusterImage.broker(replica).rack(); + // Only add rack if it is available for the broker/replica. + rackOptional.ifPresent(racks::add); + } + partitionRackInfo.put(partition, racks); Review Comment: I am thinking about a possible optimization. We could actually keep `numPartitions` and put partition's metadata only if racks is non-empty. That would save memory in clusters without rack awareness. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org