dajac commented on code in PR #13998: URL: https://github.com/apache/kafka/pull/13998#discussion_r1272465513
########## checkstyle/import-control.xml: ########## @@ -48,6 +48,7 @@ <allow pkg="org.apache.kafka.common.utils" /> <allow pkg="org.apache.kafka.common.errors" exact-match="true" /> <allow pkg="org.apache.kafka.common.memory" /> + <allow pkg="org.apache.kafka.coordinator.group.common"/> Review Comment: I suppose that we can remove this, isn't it? ########## group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json: ########## @@ -29,7 +29,20 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." } + "about": "The number of partitions of the topic." }, + { "name": "PartitionRacks", + "versions": "0+", + "type": "[]PartitionMetadata", + "about": "Partitions mapped to a set of racks.", "fields": [ + { "name": "Partition", + "versions": "0+", + "type": "int32", + "about": "The partition number." }, Review Comment: The format of the new fields does not match the convention. Please, look at existing fields/fields and update the format accordingly. ########## checkstyle/suppressions.xml: ########## @@ -321,6 +321,8 @@ <!-- group coordinator --> <suppress checks="CyclomaticComplexity" + files="(ConsumerGroupMember).java"/> + <suppress checks="ParameterNumber" Review Comment: Do we need this change? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ########## @@ -756,4 +759,24 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + + private List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> mkListOfPartitionRacks(int numPartitions){ Review Comment: nit: Should the method be static? A space is missing before `{`. ########## group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json: ########## @@ -29,7 +29,20 @@ { "name": "TopicName", "versions": "0+", "type": "string", "about": "The topic name." }, { "name": "NumPartitions", "versions": "0+", "type": "int32", - "about": "The number of partitions of the topic." } + "about": "The number of partitions of the topic." }, + { "name": "PartitionRacks", Review Comment: nit: `Partitions` instead of `PartitionRacks`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -5402,6 +5412,14 @@ private Map<Uuid, Set<Integer>> fromTopicPartitions( return assignmentMap; } + private Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) { Review Comment: We have the same code in RecodHelperTests. Could we reuse it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The subscribed topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface SubscribedTopicDescriber { + + /** + * Returns a set of subscribed topic Ids. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ + Set<Uuid> subscribedTopicIds(); Review Comment: As discussed offline, we should try to avoid this one if possible. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ########## @@ -131,13 +131,21 @@ public static Record newGroupSubscriptionMetadataRecord( Map<String, TopicMetadata> newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + ArrayList<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionRacks = new ArrayList<>(); + topicMetadata.partitionRacks().forEach((partition, racks) -> { + partitionRacks.add(new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(partition) + .setRacks(new ArrayList<>(racks)) Review Comment: Do we set racks all the time or only when the set is non-empty? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -40,23 +45,31 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition to a set of its rackIds. + * If the rack information is unavailable, pass an empty set. Review Comment: This comment is not inline with what is written [here](https://github.com/apache/kafka/pull/13998/files#diff-74fddde890b14d5626cb56e3eda5feb9d09ae0a4f469901517fed7eb1b11f34fR461), no? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ########## @@ -417,19 +418,25 @@ public void testUpdateSubscriptionMetadata() { consumerGroup.computeSubscriptionMetadata( null, null, - image.topics() + image.topics(), + image.cluster() ) ); // Compute while taking into account member 1. assertEquals( mkMap( - mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)) + mkEntry("foo", + new TopicMetadata(fooTopicId, "foo", 1, mkMap( + mkEntry(0, new HashSet<>())) Review Comment: nit: `Collection.emptySet`? There are similar cases in this file. ########## reviewers.py: ########## @@ -28,7 +28,7 @@ def prompt_for_user(): while True: try: - user_input = input("\nName or email (case insensitive): ") + user_input = input("\nName or email (case insensitive): ") Review Comment: Changes in this file should be reverted. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ########## @@ -244,16 +244,21 @@ public TargetAssignmentResult build() throws PartitionAssignorException { }); // Prepare the topic metadata. - Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>(); + Map<Uuid, PartitionMetadata> topicMetadataMap = new HashMap<>(); subscriptionMetadata.forEach((topicName, topicMetadata) -> - topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions())) + topicMetadataMap.put( + topicMetadata.id(), + new PartitionMetadata(topicMetadata.partitionRacks()) + ) ); // Compute the assignment. - GroupAssignment newGroupAssignment = assignor.assign(new AssignmentSpec( - Collections.unmodifiableMap(memberSpecs), - Collections.unmodifiableMap(topics) - )); + GroupAssignment newGroupAssignment = assignor.assign( + new AssignmentSpec( + Collections.unmodifiableMap(memberSpecs) + ), Review Comment: nit: `new AssignmentSpec(Collections.unmodifiableMap(memberSpecs)),` as it fits on one line. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ########## @@ -131,13 +131,21 @@ public static Record newGroupSubscriptionMetadataRecord( Map<String, TopicMetadata> newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + ArrayList<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionRacks = new ArrayList<>(); Review Comment: nit: You can change the type of the variable from `ArrayList` to `List`. I would also consider renaming the variable as this is about partition metadata and not racks. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ########## @@ -756,4 +759,24 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + + private List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> mkListOfPartitionRacks(int numPartitions){ + List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionRacks = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.add( + new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(i) + .setRacks(new ArrayList<>()) + ); + } + return partitionRacks; + } + + private Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) { + Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions); + for(int i = 0; i < numPartitions ; i++) { + partitionRacks.put(i, Collections.emptySet()); + } + return partitionRacks; Review Comment: nit: The indentation of this code seems to be off. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ########## @@ -756,4 +759,24 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + + private List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> mkListOfPartitionRacks(int numPartitions){ + List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionRacks = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.add( + new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(i) + .setRacks(new ArrayList<>()) Review Comment: nit: `Collections.emptyList()`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java: ########## @@ -756,4 +759,24 @@ public void testNewOffsetCommitTombstoneRecord() { Record record = RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1); assertEquals(expectedRecord, record); } + + private List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> mkListOfPartitionRacks(int numPartitions){ + List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> partitionRacks = new ArrayList<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.add( + new ConsumerGroupPartitionMetadataValue.PartitionMetadata() + .setPartition(i) + .setRacks(new ArrayList<>()) + ); + } + return partitionRacks; + } + + private Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) { Review Comment: nit: This could be static as well. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; Review Comment: This one should go to the `consumer` package, next to the `TopicMetadata` class, no? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java: ########## @@ -40,23 +45,31 @@ public class TopicMetadata { */ private final int numPartitions; + /** + * Map of every partition to a set of its rackIds. + * If the rack information is unavailable, pass an empty set. + */ + private final Map<Integer, Set<String>> partitionRacks; Review Comment: I am confused about `PartitionMetadata` class. Is it meant to be used here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -434,22 +436,38 @@ public void setSubscriptionMetadata( public Map<String, TopicMetadata> computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { // Copy and update the current subscriptions. Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); // Create the topic metadata for each subscribed topic. Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { + Map<Integer, Set<String>> partitionRacks = new HashMap<>(); + + topicImage.partitions().forEach((partition, partitionRegistration) -> { + Set<String> racks = new HashSet<>(); + for (int replica : Objects.requireNonNull(partitionRegistration.replicas)) { + Optional<String> rackOptional = clusterImage.broker(replica).rack(); + // Only add rack if it is available for the broker/replica. + rackOptional.ifPresent(racks::add); + } + // If no racks are available for any replica of this partition, store an empty map. + if (!racks.isEmpty()) + partitionRacks.put(partition, racks); + }); + newSubscriptionMetadata.put(topicName, new TopicMetadata( topicImage.id(), topicImage.name(), - topicImage.partitions().size() - )); + topicImage.partitions().size(), + partitionRacks)); Review Comment: nit: Could we put the closing `))` back on the new line as it was? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * The PartitionMetadata object is used by the group coordinators assignor. + * + * It provides methods for accessing the partition metadata, + * including the number of partitions associated with the topic and the rack information + * for the replicas of a given partition. + */ +public class PartitionMetadata { + + /** + * Partition number mapped to a set of racks where + * its replicas are located. + */ + private final Map<Integer, Set<String>> partitionsWithRacks; + + /** + * If rack information isn't available pass an empty set. + */ + public PartitionMetadata(Map<Integer, Set<String>> partitionsWithRacks) { + Objects.requireNonNull(partitionsWithRacks); + this.partitionsWithRacks = partitionsWithRacks; + } + + /** + * Returns the number of partitions. + * + * @return Number of partitions associated with the topic. + */ + public int numPartitions() { + return partitionsWithRacks.size(); + } + + /** + * Returns the rack information for the replicas of the given partition. + * + * @param partition partition Id. + * @return Set of racks associated with the replicas of the given partition. + * If no rack information is available, an empty set is returned. + */ + public Set<String> racks(int partition) { + return partitionsWithRacks.get(partition); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PartitionMetadata that = (PartitionMetadata) o; + return Objects.equals(partitionsWithRacks, that.partitionsWithRacks); + } + + @Override + public int hashCode() { + return partitionsWithRacks.hashCode(); + } + + @Override + public String toString() { + return "PartitionMetadata{" + Review Comment: nit: `{ .. }` -> `( ... )`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java: ########## @@ -244,16 +244,21 @@ public TargetAssignmentResult build() throws PartitionAssignorException { }); // Prepare the topic metadata. - Map<Uuid, AssignmentTopicMetadata> topics = new HashMap<>(); + Map<Uuid, PartitionMetadata> topicMetadataMap = new HashMap<>(); subscriptionMetadata.forEach((topicName, topicMetadata) -> - topics.put(topicMetadata.id(), new AssignmentTopicMetadata(topicMetadata.numPartitions())) + topicMetadataMap.put( + topicMetadata.id(), + new PartitionMetadata(topicMetadata.partitionRacks()) Review Comment: I am confused by the usage of PartitionMetadata only here. I think that this should be used in subscription metadata as well or not at all. It does not make sense to only use it here, right? Our goal is to avoid having to copy anything. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -434,22 +437,36 @@ public void setSubscriptionMetadata( public Map<String, TopicMetadata> computeSubscriptionMetadata( ConsumerGroupMember oldMember, ConsumerGroupMember newMember, - TopicsImage topicsImage + TopicsImage topicsImage, + ClusterImage clusterImage ) { // Copy and update the current subscriptions. Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames); maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); // Create the topic metadata for each subscribed topic. Map<String, TopicMetadata> newSubscriptionMetadata = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.forEach((topicName, count) -> { TopicImage topicImage = topicsImage.getTopic(topicName); if (topicImage != null) { + Map<Integer, Set<String>> partitionRackInfo = new HashMap<>(); + + topicImage.partitions().forEach((partition, partitionRegistration) -> { + Set<String> racks = new HashSet<>(); + for (int replica : Objects.requireNonNull(partitionRegistration.replicas)) { Review Comment: Let's remove it if it cannot by null. We usually use `Objects.requireNonNull` in constructors not really within the code like this. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionMetadata.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * The PartitionMetadata object is used by the group coordinators assignor. + * + * It provides methods for accessing the partition metadata, + * including the number of partitions associated with the topic and the rack information + * for the replicas of a given partition. + */ +public class PartitionMetadata { + + /** + * Partition number mapped to a set of racks where + * its replicas are located. + */ + private final Map<Integer, Set<String>> partitionsWithRacks; + + /** + * If rack information isn't available pass an empty set. + */ + public PartitionMetadata(Map<Integer, Set<String>> partitionsWithRacks) { + Objects.requireNonNull(partitionsWithRacks); + this.partitionsWithRacks = partitionsWithRacks; + } + + /** + * Returns the number of partitions. + * + * @return Number of partitions associated with the topic. + */ + public int numPartitions() { + return partitionsWithRacks.size(); Review Comment: My understanding is that the map won't have anything for partitions without racks so this must be wrong, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org