This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb97d63d418 KAFKA-17578: Remove partitionRacks from TopicMetadata 
(#17233)
bb97d63d418 is described below

commit bb97d63d418fe047cf5a59b16c7004e5011402da
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Sep 25 15:48:48 2024 +0800

    KAFKA-17578: Remove partitionRacks from TopicMetadata (#17233)
    
    The ModernGroup#subscribedTopicMetadata takes too much memory due to 
partitionRacks. This is not being used at the moment as the consumer protocol 
does not support rack aware assignments.
    
    A heap dump from a group with 500 members, 2K subscribed topic partitions 
shows 654,400 bytes used for partitionRacks. The rest of the ConsumerGroup 
object holds 822,860 bytes.
    
    Reviewers: David Jacot <[email protected]>
---
 .../group/GroupCoordinatorRecordHelpers.java       |  22 ---
 .../coordinator/group/modern/ModernGroup.java      |  20 +--
 .../group/modern/SubscribedTopicDescriberImpl.java |   3 +-
 .../coordinator/group/modern/TopicMetadata.java    |  60 ++------
 .../ConsumerGroupPartitionMetadataValue.json       |   2 +-
 .../group/GroupCoordinatorRecordHelpersTest.java   |  46 ++-----
 .../group/GroupMetadataManagerTest.java            | 153 ++++++++++-----------
 .../OptimizedUniformAssignmentBuilderTest.java     |  49 +++----
 .../group/assignor/RangeAssignorTest.java          |  72 ++++------
 .../group/assignor/SimpleAssignorTest.java         |  27 ++--
 .../UniformHeterogeneousAssignmentBuilderTest.java |  64 +++------
 .../group/modern/SubscribedTopicMetadataTest.java  |  35 +----
 .../group/modern/TargetAssignmentBuilderTest.java  |  35 +++--
 .../group/modern/TopicMetadataTest.java            |  21 ++-
 .../modern/consumer/ConsumerGroupBuilder.java      |   4 +-
 .../group/modern/consumer/ConsumerGroupTest.java   |  45 +++---
 .../group/modern/share/ShareGroupBuilder.java      |   4 +-
 .../group/modern/share/ShareGroupTest.java         |  45 +++---
 .../kafka/jmh/assignor/AssignorBenchmarkUtils.java |   8 +-
 .../jmh/assignor/ServerSideAssignorBenchmark.java  |   6 +-
 .../assignor/TargetAssignmentBuilderBenchmark.java |   3 +-
 21 files changed, 238 insertions(+), 486 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 3665fb05224..67364470b0f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -139,21 +139,10 @@ public class GroupCoordinatorRecordHelpers {
     ) {
         ConsumerGroupPartitionMetadataValue value = new 
ConsumerGroupPartitionMetadataValue();
         newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
-            List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
-            // If the partition rack information map is empty, store an empty 
list in the record.
-            if (!topicMetadata.partitionRacks().isEmpty()) {
-                topicMetadata.partitionRacks().forEach((partition, racks) ->
-                    partitionMetadata.add(new 
ConsumerGroupPartitionMetadataValue.PartitionMetadata()
-                        .setPartition(partition)
-                        .setRacks(new ArrayList<>(racks))
-                    )
-                );
-            }
             value.topics().add(new 
ConsumerGroupPartitionMetadataValue.TopicMetadata()
                 .setTopicId(topicMetadata.id())
                 .setTopicName(topicMetadata.name())
                 .setNumPartitions(topicMetadata.numPartitions())
-                .setPartitionMetadata(partitionMetadata)
             );
         });
 
@@ -657,21 +646,10 @@ public class GroupCoordinatorRecordHelpers {
     ) {
         ShareGroupPartitionMetadataValue value = new 
ShareGroupPartitionMetadataValue();
         newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
-            List<ShareGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
-            // If the partition rack information map is empty, store an empty 
list in the record.
-            if (!topicMetadata.partitionRacks().isEmpty()) {
-                topicMetadata.partitionRacks().forEach((partition, racks) ->
-                    partitionMetadata.add(new 
ShareGroupPartitionMetadataValue.PartitionMetadata()
-                        .setPartition(partition)
-                        .setRacks(new ArrayList<>(racks))
-                    )
-                );
-            }
             value.topics().add(new 
ShareGroupPartitionMetadataValue.TopicMetadata()
                 .setTopicId(topicMetadata.id())
                 .setTopicName(topicMetadata.name())
                 .setNumPartitions(topicMetadata.numPartitions())
-                .setPartitionMetadata(partitionMetadata)
             );
         });
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 508b1c7d131..823792c198e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -35,7 +35,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
@@ -389,26 +388,11 @@ public abstract class ModernGroup<T extends 
ModernGroupMember> implements Group
         subscribedTopicNames.forEach((topicName, count) -> {
             TopicImage topicImage = topicsImage.getTopic(topicName);
             if (topicImage != null) {
-                Map<Integer, Set<String>> partitionRacks = new HashMap<>();
-                topicImage.partitions().forEach((partition, 
partitionRegistration) -> {
-                    Set<String> racks = new HashSet<>();
-                    for (int replica : partitionRegistration.replicas) {
-                        Optional<String> rackOptional = 
clusterImage.broker(replica).rack();
-                        // Only add the rack if it is available for the 
broker/replica.
-                        rackOptional.ifPresent(racks::add);
-                    }
-                    // If rack information is unavailable for all replicas of 
this partition,
-                    // no corresponding entry will be stored for it in the map.
-                    if (!racks.isEmpty())
-                        partitionRacks.put(partition, racks);
-                });
-
                 newSubscriptionMetadata.put(topicName, new TopicMetadata(
                     topicImage.id(),
                     topicImage.name(),
-                    topicImage.partitions().size(),
-                    partitionRacks)
-                );
+                    topicImage.partitions().size()
+                ));
             }
         });
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
index 81a03f97a82..7871b04d722 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
@@ -72,8 +72,7 @@ public class SubscribedTopicDescriberImpl implements 
SubscribedTopicDescriber {
      */
     @Override
     public Set<String> racksForPartition(Uuid topicId, int partition) {
-        TopicMetadata topic = this.topicMetadata.get(topicId);
-        return topic == null ? Collections.emptySet() : 
topic.partitionRacks().getOrDefault(partition, Collections.emptySet());
+        return Collections.emptySet();
     }
 
     @Override
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
index 2eba447203b..ace670cc0e4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
@@ -20,12 +20,7 @@ import org.apache.kafka.common.Uuid;
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * Immutable topic metadata.
@@ -46,17 +41,10 @@ public class TopicMetadata {
      */
     private final int numPartitions;
 
-    /**
-     * Map of every partition Id to a set of its rack Ids, if they exist.
-     * If rack information is unavailable for all partitions, this is an empty 
map.
-     */
-    private final Map<Integer, Set<String>> partitionRacks;
-
     public TopicMetadata(
         Uuid id,
         String name,
-        int numPartitions,
-        Map<Integer, Set<String>> partitionRacks
+        int numPartitions
     ) {
         this.id = Objects.requireNonNull(id);
         if (Uuid.ZERO_UUID.equals(id)) {
@@ -70,7 +58,6 @@ public class TopicMetadata {
         if (numPartitions < 0) {
             throw new IllegalArgumentException("Number of partitions cannot be 
negative.");
         }
-        this.partitionRacks = Objects.requireNonNull(partitionRacks);
     }
 
     /**
@@ -94,14 +81,6 @@ public class TopicMetadata {
         return this.numPartitions;
     }
 
-    /**
-     * @return Every partition mapped to the set of corresponding available 
rack Ids of its replicas.
-     *         An empty map is returned if rack information is unavailable for 
all partitions.
-     */
-    public Map<Integer, Set<String>> partitionRacks() {
-        return this.partitionRacks;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -111,8 +90,7 @@ public class TopicMetadata {
 
         if (!id.equals(that.id)) return false;
         if (!name.equals(that.name)) return false;
-        if (numPartitions != that.numPartitions) return false;
-        return partitionRacks.equals(that.partitionRacks);
+        return numPartitions == that.numPartitions;
     }
 
     @Override
@@ -120,7 +98,6 @@ public class TopicMetadata {
         int result = id.hashCode();
         result = 31 * result + name.hashCode();
         result = 31 * result + numPartitions;
-        result = 31 * result + partitionRacks.hashCode();
         return result;
     }
 
@@ -130,45 +107,26 @@ public class TopicMetadata {
             "id=" + id +
             ", name=" + name +
             ", numPartitions=" + numPartitions +
-            ", partitionRacks=" + partitionRacks +
             ')';
     }
 
     public static TopicMetadata fromRecord(
         ConsumerGroupPartitionMetadataValue.TopicMetadata record
     ) {
-        // Converting the data type from a list stored in the record to a map 
for the topic metadata.
-        Map<Integer, Set<String>> partitionRacks = new HashMap<>();
-        for (ConsumerGroupPartitionMetadataValue.PartitionMetadata 
partitionMetadata : record.partitionMetadata()) {
-            partitionRacks.put(
-                partitionMetadata.partition(),
-                Collections.unmodifiableSet(new 
HashSet<>(partitionMetadata.racks()))
-            );
-        }
-
         return new TopicMetadata(
             record.topicId(),
             record.topicName(),
-            record.numPartitions(),
-            partitionRacks);
+            record.numPartitions()
+        );
     }
 
     public static TopicMetadata fromRecord(
-            ShareGroupPartitionMetadataValue.TopicMetadata record
+        ShareGroupPartitionMetadataValue.TopicMetadata record
     ) {
-        // Converting the data type from a list stored in the record to a map 
for the topic metadata.
-        Map<Integer, Set<String>> partitionRacks = new HashMap<>();
-        for (ShareGroupPartitionMetadataValue.PartitionMetadata 
partitionMetadata : record.partitionMetadata()) {
-            partitionRacks.put(
-                    partitionMetadata.partition(),
-                    Collections.unmodifiableSet(new 
HashSet<>(partitionMetadata.racks()))
-            );
-        }
-
         return new TopicMetadata(
-                record.topicId(),
-                record.topicName(),
-                record.numPartitions(),
-                partitionRacks);
+            record.topicId(),
+            record.topicName(),
+            record.numPartitions()
+        );
     }
 }
diff --git 
a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
index 4a17d6f685d..89be8cfa056 100644
--- 
a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
@@ -29,7 +29,7 @@
       { "name": "NumPartitions", "versions": "0+", "type": "int32",
         "about": "The number of partitions of the topic." },
       { "name": "PartitionMetadata", "versions": "0+", "type": 
"[]PartitionMetadata",
-        "about": "Partitions mapped to a set of racks. If the rack information 
is unavailable for all the partitions, an empty list is stored", "fields": [
+        "about": "Deprecated: this field is not used after 4.0. Partitions 
mapped to a set of racks. If the rack information is unavailable for all the 
partitions, an empty list is stored", "fields": [
           { "name": "Partition", "versions": "0+", "type": "int32",
             "about": "The partition number." },
           { "name": "Racks", "versions": "0+", "type": "[]string",
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index f71798a0759..38e2b9d68cf 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -165,14 +165,12 @@ public class GroupCoordinatorRecordHelpersTest {
         subscriptionMetadata.put("foo", new TopicMetadata(
             fooTopicId,
             "foo",
-            10,
-            mkMapOfPartitionRacks(10)
+            10
         ));
         subscriptionMetadata.put("bar", new TopicMetadata(
             barTopicId,
             "bar",
-            20,
-            mkMapOfPartitionRacks(20)
+            20
         ));
 
         CoordinatorRecord expectedRecord = new CoordinatorRecord(
@@ -187,13 +185,11 @@ public class GroupCoordinatorRecordHelpersTest {
                         new ConsumerGroupPartitionMetadataValue.TopicMetadata()
                             .setTopicId(fooTopicId)
                             .setTopicName("foo")
-                            .setNumPartitions(10)
-                            .setPartitionMetadata(mkListOfPartitionRacks(10)),
+                            .setNumPartitions(10),
                         new ConsumerGroupPartitionMetadataValue.TopicMetadata()
                             .setTopicId(barTopicId)
                             .setTopicName("bar")
-                            .setNumPartitions(20)
-                            
.setPartitionMetadata(mkListOfPartitionRacks(20)))),
+                            .setNumPartitions(20))),
                 (short) 0));
 
         assertRecordEquals(expectedRecord, 
newConsumerGroupSubscriptionMetadataRecord(
@@ -226,14 +222,12 @@ public class GroupCoordinatorRecordHelpersTest {
         subscriptionMetadata.put("foo", new TopicMetadata(
             fooTopicId,
             "foo",
-            10,
-            Collections.emptyMap()
+            10
         ));
         subscriptionMetadata.put("bar", new TopicMetadata(
             barTopicId,
             "bar",
-            20,
-            Collections.emptyMap()
+            20
         ));
 
         CoordinatorRecord expectedRecord = new CoordinatorRecord(
@@ -248,13 +242,11 @@ public class GroupCoordinatorRecordHelpersTest {
                         new ConsumerGroupPartitionMetadataValue.TopicMetadata()
                             .setTopicId(fooTopicId)
                             .setTopicName("foo")
-                            .setNumPartitions(10)
-                            .setPartitionMetadata(Collections.emptyList()),
+                            .setNumPartitions(10),
                         new ConsumerGroupPartitionMetadataValue.TopicMetadata()
                             .setTopicId(barTopicId)
                             .setTopicName("bar")
-                            .setNumPartitions(20)
-                            .setPartitionMetadata(Collections.emptyList()))),
+                            .setNumPartitions(20))),
                 (short) 0));
 
         assertRecordEquals(expectedRecord, 
newConsumerGroupSubscriptionMetadataRecord(
@@ -821,28 +813,6 @@ public class GroupCoordinatorRecordHelpersTest {
         assertEquals(expectedRecord, record);
     }
 
-    /**
-     * Creates a list of values to be added to the record and assigns 
partitions to racks for testing.
-     *
-     * @param numPartitions The number of partitions for the topic.
-     *
-     * For testing purposes, the following criteria are used:
-     *      - Number of replicas for each partition: 2
-     *      - Number of racks available to the cluster: 4
-     */
-    public static List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
mkListOfPartitionRacks(int numPartitions) {
-        List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
partitionRacks = new ArrayList<>(numPartitions);
-        for (int i = 0; i < numPartitions; i++) {
-            List<String> racks = new ArrayList<>(Arrays.asList("rack" + i % 4, 
"rack" + (i + 1) % 4));
-            partitionRacks.add(
-                new ConsumerGroupPartitionMetadataValue.PartitionMetadata()
-                    .setPartition(i)
-                    .setRacks(racks)
-            );
-        }
-        return partitionRacks;
-    }
-
     /**
      * Creates a map of partitions to racks for testing.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4fbe8967ad7..2a56cd912cd 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -125,7 +125,6 @@ import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
@@ -501,8 +500,8 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = Arrays.asList(
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {{
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                 }}),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, mkAssignment(
@@ -600,8 +599,8 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
@@ -725,12 +724,6 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = Arrays.asList(
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember3),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
-                {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
-                }
-            }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1),
@@ -748,9 +741,9 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember3)
         );
 
-        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
-        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
-        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+        assertRecordsEquals(expectedRecords.subList(0, 2), 
result.records().subList(0, 2));
+        assertUnorderedListEquals(expectedRecords.subList(2, 5), 
result.records().subList(2, 5));
+        assertRecordsEquals(expectedRecords.subList(5, 7), 
result.records().subList(5, 7));
     }
 
     @Test
@@ -837,8 +830,8 @@ public class GroupMetadataManagerTest {
             // Subscription metadata is recomputed because zar is no longer 
there.
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11)
@@ -961,12 +954,6 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = Arrays.asList(
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember3),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
-                {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
-                }
-            }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
             
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, mkAssignment(
                 mkTopicAssignment(fooTopicId, 0, 1),
@@ -984,9 +971,9 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember3)
         );
 
-        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
-        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
-        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+        assertRecordsEquals(expectedRecords.subList(0, 2), 
result.records().subList(0, 2));
+        assertUnorderedListEquals(expectedRecords.subList(2, 5), 
result.records().subList(2, 5));
+        assertRecordsEquals(expectedRecords.subList(5, 7), 
result.records().subList(5, 7));
     }
 
     @Test
@@ -1052,8 +1039,8 @@ public class GroupMetadataManagerTest {
                 .withAssignmentEpoch(10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                     }
                 }))
             .build();
@@ -1225,7 +1212,7 @@ public class GroupMetadataManagerTest {
                 .withAssignmentEpoch(10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
                     }
                 }))
             .build();
@@ -1343,8 +1330,8 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedRejoinedMember),
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
@@ -1538,8 +1525,8 @@ public class GroupMetadataManagerTest {
             // Subscription metadata is recomputed because zar is no longer 
there.
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11)
@@ -2597,7 +2584,7 @@ public class GroupMetadataManagerTest {
                     {
                         // foo only has 3 partitions stored in the metadata 
but foo has
                         // 6 partitions the metadata image.
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3, mkMapOfPartitionRacks(3)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
                     }
                 }))
             .build();
@@ -2651,7 +2638,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = Arrays.asList(
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
@@ -2708,7 +2695,7 @@ public class GroupMetadataManagerTest {
                     {
                         // foo only has 3 partitions stored in the metadata 
but foo has
                         // 6 partitions the metadata image.
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3, mkMapOfPartitionRacks(3)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
                     }
                 }))
             .build();
@@ -2780,7 +2767,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = Arrays.asList(
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
@@ -9845,8 +9832,8 @@ public class GroupMetadataManagerTest {
             // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 1, mkMapOfPartitionRacks(1)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 1));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
                 }
             }),
 
@@ -10078,8 +10065,8 @@ public class GroupMetadataManagerTest {
             // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
                 }
             }),
 
@@ -10241,7 +10228,7 @@ public class GroupMetadataManagerTest {
             // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 1, mkMapOfPartitionRacks(1)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 1));
                 }
             }),
 
@@ -10345,8 +10332,8 @@ public class GroupMetadataManagerTest {
 
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
             {
-                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6, mkMapOfPartitionRacks(6)));
-                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3, mkMapOfPartitionRacks(3)));
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3));
             }
         }));
 
@@ -10632,8 +10619,8 @@ public class GroupMetadataManagerTest {
             // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
                 }
             }),
 
@@ -10846,8 +10833,8 @@ public class GroupMetadataManagerTest {
 
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
             {
-                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6, mkMapOfPartitionRacks(6)));
-                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3, mkMapOfPartitionRacks(3)));
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3));
             }
         }));
 
@@ -11034,8 +11021,8 @@ public class GroupMetadataManagerTest {
 
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
             {
-                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6, mkMapOfPartitionRacks(6)));
-                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3, mkMapOfPartitionRacks(3)));
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3));
             }
         }));
 
@@ -11221,9 +11208,9 @@ public class GroupMetadataManagerTest {
 
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
             {
-                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6, mkMapOfPartitionRacks(6)));
-                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3, mkMapOfPartitionRacks(3)));
-                put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 
1, mkMapOfPartitionRacks(1)));
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
6));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3));
+                put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 
1));
             }
         }));
 
@@ -11428,7 +11415,7 @@ public class GroupMetadataManagerTest {
                 .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                     .withSubscriptionMetadata(new HashMap<String, 
TopicMetadata>() {
                         {
-                            put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                            put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
                         }
                     })
                     .withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11514,8 +11501,8 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
                     }
                 }),
                 
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@@ -11577,7 +11564,7 @@ public class GroupMetadataManagerTest {
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
                     }
                 })
                 .withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11633,7 +11620,7 @@ public class GroupMetadataManagerTest {
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
                     }
                 })
                 .withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11684,8 +11671,8 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember),
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
@@ -11728,7 +11715,7 @@ public class GroupMetadataManagerTest {
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
                     }
                 })
                 .withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11850,9 +11837,9 @@ public class GroupMetadataManagerTest {
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
-                        put(zarTopicName, new TopicMetadata(zarTopicId, 
zarTopicName, 1, mkMapOfPartitionRacks(1)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
+                        put(zarTopicName, new TopicMetadata(zarTopicId, 
zarTopicName, 1));
                     }
                 })
                 .withMember(new ConsumerGroupMember.Builder(memberId1)
@@ -12080,8 +12067,8 @@ public class GroupMetadataManagerTest {
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
                     }
                 })
                 .withMember(new ConsumerGroupMember.Builder(memberId1)
@@ -12182,9 +12169,9 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
-                    put(zarTopicName, new TopicMetadata(zarTopicId, 
zarTopicName, 1, mkMapOfPartitionRacks(1)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
+                    put(zarTopicName, new TopicMetadata(zarTopicId, 
zarTopicName, 1));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
@@ -12322,8 +12309,8 @@ public class GroupMetadataManagerTest {
             .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
                 .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
                     }
                 })
                 .withMember(new ConsumerGroupMember.Builder(memberId1)
@@ -12424,9 +12411,9 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1, mkMapOfPartitionRacks(1)));
-                    put(zarTopicName, new TopicMetadata(zarTopicId, 
zarTopicName, 1, mkMapOfPartitionRacks(1)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1));
+                    put(zarTopicName, new TopicMetadata(zarTopicId, 
zarTopicName, 1));
                 }
             }),
             GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
11),
@@ -13422,8 +13409,8 @@ public class GroupMetadataManagerTest {
         
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
             {
-                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
2, mkMapOfPartitionRacks(2)));
-                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
1, mkMapOfPartitionRacks(1)));
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
2));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
1));
             }
         }));
 
@@ -13594,8 +13581,8 @@ public class GroupMetadataManagerTest {
         
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
 10);
         
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
             {
-                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
2, mkMapOfPartitionRacks(2)));
-                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
1, mkMapOfPartitionRacks(1)));
+                put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
2));
+                put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 
1));
             }
         }));
 
@@ -13627,7 +13614,7 @@ public class GroupMetadataManagerTest {
             // Update the subscription metadata.
             
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 2));
                 }
             }),
             // Bump the group epoch.
@@ -13988,8 +13975,8 @@ public class GroupMetadataManagerTest {
             
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, 
expectedMember),
             
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, 
new HashMap<String, TopicMetadata>() {
                 {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                 }
             }),
             GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1),
@@ -14082,8 +14069,8 @@ public class GroupMetadataManagerTest {
                 // Subscription metadata is recomputed because zar is no 
longer there.
                 
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, 
new HashMap<String, TopicMetadata>() {
                     {
-                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
-                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3));
                     }
                 }),
                 
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
@@ -14348,7 +14335,7 @@ public class GroupMetadataManagerTest {
 
         Map<String, TopicMetadata> metadata = Collections.singletonMap(
             "bar",
-            new TopicMetadata(Uuid.randomUuid(), "bar", 10, 
Collections.emptyMap())
+            new TopicMetadata(Uuid.randomUuid(), "bar", 10)
         );
 
         // The group is created if it does not exist.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
index 295794c36ad..a0d524ab839 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
@@ -44,7 +44,6 @@ import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTarg
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -70,8 +69,7 @@ public class OptimizedUniformAssignmentBuilderTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    3,
-                    mkMapOfPartitionRacks(3)
+                    3
                 )
             )
         );
@@ -108,8 +106,7 @@ public class OptimizedUniformAssignmentBuilderTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    3,
-                    mkMapOfPartitionRacks(3)
+                    3
                 )
             )
         );
@@ -140,14 +137,12 @@ public class OptimizedUniformAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            2,
-            mkMapOfPartitionRacks(2)
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -197,8 +192,7 @@ public class OptimizedUniformAssignmentBuilderTest {
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            2,
-            mkMapOfPartitionRacks(2)
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -260,8 +254,7 @@ public class OptimizedUniformAssignmentBuilderTest {
             topicMetadata.put(topicId, new TopicMetadata(
                 topicId,
                 "topic-" + i,
-                3,
-                mkMapOfPartitionRacks(3)
+                3
             ));
         }
 
@@ -296,14 +289,12 @@ public class OptimizedUniformAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -361,14 +352,12 @@ public class OptimizedUniformAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            6,
-            mkMapOfPartitionRacks(6)
+            6
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            5,
-            mkMapOfPartitionRacks(5)
+            5
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -425,14 +414,12 @@ public class OptimizedUniformAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -499,14 +486,12 @@ public class OptimizedUniformAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -565,14 +550,12 @@ public class OptimizedUniformAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            2,
-            mkMapOfPartitionRacks(2)
+            2
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            2,
-            mkMapOfPartitionRacks(2)
+            2
         ));
 
         // Initial subscriptions were [T1, T2]
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
index 64b946f906e..4c4321cfb2e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
@@ -68,8 +68,7 @@ public class RangeAssignorTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    3,
-                    Collections.emptyMap()
+                    3
                 )
             )
         );
@@ -111,8 +110,7 @@ public class RangeAssignorTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    3,
-                    Collections.emptyMap()
+                    3
                 )
             )
         );
@@ -143,14 +141,12 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -200,20 +196,17 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -273,14 +266,12 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -343,8 +334,7 @@ public class RangeAssignorTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    3,
-                    Collections.emptyMap()
+                    3
                 )
             )
         );
@@ -414,8 +404,7 @@ public class RangeAssignorTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    5,
-                    Collections.emptyMap()
+                    5
                 )
             )
         );
@@ -497,14 +486,12 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            2,
-            Collections.emptyMap()
+            2
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -571,14 +558,12 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            4,
-            Collections.emptyMap()
+            4
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            4,
-            Collections.emptyMap()
+            4
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -634,14 +619,12 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -710,14 +693,12 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            4,
-            Collections.emptyMap()
+            4
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -784,14 +765,12 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -835,20 +814,17 @@ public class RangeAssignorTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         // Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index 0553db6a487..4d0fe8cdf7b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -88,8 +88,7 @@ public class SimpleAssignorTest {
                 new TopicMetadata(
                     TOPIC_1_UUID,
                     TOPIC_1_NAME,
-                    3,
-                    Collections.emptyMap()
+                    3
                 )
             )
         );
@@ -126,8 +125,7 @@ public class SimpleAssignorTest {
                 new TopicMetadata(
                     TOPIC_1_UUID,
                     TOPIC_1_NAME,
-                    3,
-                    Collections.emptyMap()
+                    3
                 )
             )
         );
@@ -158,14 +156,12 @@ public class SimpleAssignorTest {
         topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
             TOPIC_1_UUID,
             TOPIC_1_NAME,
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
             TOPIC_3_UUID,
             TOPIC_3_NAME,
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -215,21 +211,18 @@ public class SimpleAssignorTest {
         topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
             TOPIC_1_UUID,
             TOPIC_1_NAME,
-            3,
-            Collections.emptyMap()
+            3
         ));
 
         topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
             TOPIC_2_UUID,
             "topic2",
-            3,
-            Collections.emptyMap()
+            3
         ));
         topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
             TOPIC_3_UUID,
             TOPIC_3_NAME,
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -289,15 +282,13 @@ public class SimpleAssignorTest {
         topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
             TOPIC_1_UUID,
             TOPIC_1_NAME,
-            3,
-            Collections.emptyMap()
+            3
         ));
 
         topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
             TOPIC_2_UUID,
             "topic2",
-            2,
-            Collections.emptyMap()
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
index 0ced6b2f63d..93be24076f1 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
@@ -40,7 +40,6 @@ import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssign
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -67,8 +66,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    3,
-                    mkMapOfPartitionRacks(3)
+                    3
                 )
             )
         );
@@ -109,8 +107,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
                 new TopicMetadata(
                     topic1Uuid,
                     topic1Name,
-                    3,
-                    mkMapOfPartitionRacks(3)
+                    3
                 )
             )
         );
@@ -146,14 +143,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            6,
-            mkMapOfPartitionRacks(6)
+            6
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -202,14 +197,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            1,
-            mkMapOfPartitionRacks(1)
+            1
         ));
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            2,
-            mkMapOfPartitionRacks(2)
+            2
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -268,20 +261,17 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            6,
-            mkMapOfPartitionRacks(6)
+            6
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            4,
-            mkMapOfPartitionRacks(4)
+            4
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            4,
-            mkMapOfPartitionRacks(4)
+            4
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -350,26 +340,22 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            6,
-            mkMapOfPartitionRacks(6)
+            6
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            5,
-            mkMapOfPartitionRacks(5)
+            5
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic4Uuid, new TopicMetadata(
             topic4Uuid,
             topic4Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -426,14 +412,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            6,
-            mkMapOfPartitionRacks(6)
+            6
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            7,
-            mkMapOfPartitionRacks(7)
+            7
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -499,20 +483,17 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            8,
-            mkMapOfPartitionRacks(4)
+            8
         ));
         topicMetadata.put(topic3Uuid, new TopicMetadata(
             topic3Uuid,
             topic3Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
@@ -568,14 +549,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
         topicMetadata.put(topic2Uuid, new TopicMetadata(
             topic2Uuid,
             topic2Name,
-            5,
-            mkMapOfPartitionRacks(5)
+            5
         ));
 
         // Initial subscriptions were [T1, T2]
@@ -630,8 +609,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
         topicMetadata.put(topic1Uuid, new TopicMetadata(
             topic1Uuid,
             topic1Name,
-            3,
-            mkMapOfPartitionRacks(3)
+            3
         ));
 
         Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
index c90aaf02bcb..0842cee241d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
@@ -21,12 +21,9 @@ import org.apache.kafka.common.Uuid;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -42,10 +39,9 @@ public class SubscribedTopicMetadataTest {
         for (int i = 0; i < 5; i++) {
             Uuid topicId = Uuid.randomUuid();
             String topicName = "topic" + i;
-            Map<Integer, Set<String>> partitionRacks = 
mkMapOfPartitionRacks(5);
             topicMetadataMap.put(
                 topicId,
-                new TopicMetadata(topicId, topicName, 5, partitionRacks)
+                new TopicMetadata(topicId, topicName, 5)
             );
         }
         subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(topicMetadataMap);
@@ -68,44 +64,19 @@ public class SubscribedTopicMetadataTest {
         // Test -1 is returned when the topic Id doesn't exist.
         assertEquals(-1, subscribedTopicMetadata.numPartitions(topicId));
 
-        topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, 
Collections.emptyMap()));
+        topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3));
 
         // Test that the correct number of partitions are returned for a given 
topic Id.
         assertEquals(3, subscribedTopicMetadata.numPartitions(topicId));
     }
 
-    @Test
-    public void testRacksForPartition() {
-        Uuid topicId = Uuid.randomUuid();
-
-        // Test that an empty set is returned for a non-existent topic Id.
-        assertEquals(Collections.emptySet(), 
subscribedTopicMetadata.racksForPartition(topicId, 0));
-
-        // Add topic Id with partition racks included.
-        Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(3);
-        topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, 
partitionRacks));
-
-        // Test that an empty set is returned for a non-existent partition Id.
-        assertEquals(Collections.emptySet(), 
subscribedTopicMetadata.racksForPartition(topicId, 4));
-
-        // Test that a correct set of racks is returned for the given topic Id 
and partition Id.
-        assertEquals(partitionRacks.get(2), 
subscribedTopicMetadata.racksForPartition(topicId, 2));
-
-        // Add another topic Id without partition racks.
-        topicId = Uuid.randomUuid();
-        topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3, 
Collections.emptyMap()));
-
-        // Test that an empty set is returned when the partition rack info is 
absent.
-        assertEquals(Collections.emptySet(), 
subscribedTopicMetadata.racksForPartition(topicId, 1));
-    }
-
     @Test
     public void testEquals() {
         assertEquals(new SubscribedTopicDescriberImpl(topicMetadataMap), 
subscribedTopicMetadata);
 
         Map<Uuid, TopicMetadata> topicMetadataMap2 = new HashMap<>();
         Uuid topicId = Uuid.randomUuid();
-        topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 
5, Collections.emptyMap()));
+        topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 
5));
         assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2), 
subscribedTopicMetadata);
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index b4f25e00f2e..093145650b9 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -43,7 +43,6 @@ import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 import static 
org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -102,15 +101,13 @@ public class TargetAssignmentBuilderTest {
 
         public Uuid addTopicMetadata(
             String topicName,
-            int numPartitions,
-            Map<Integer, Set<String>> partitionRacks
+            int numPartitions
         ) {
             Uuid topicId = Uuid.randomUuid();
             subscriptionMetadata.put(topicName, new TopicMetadata(
                 topicId,
                 topicName,
-                numPartitions,
-                partitionRacks
+                numPartitions
             ));
             topicsImageBuilder = topicsImageBuilder.addTopic(topicId, 
topicName, numPartitions);
 
@@ -314,8 +311,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
-        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
 
         context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), 
mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -364,8 +361,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
-        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
 
         context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), 
mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -427,8 +424,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
-        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
 
         context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), 
mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -505,8 +502,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
-        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
 
         context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), 
mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -592,8 +589,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
mkMapOfPartitionRacks(6));
-        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
mkMapOfPartitionRacks(6));
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
 
         context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), 
mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2),
@@ -670,8 +667,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
-        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
 
         context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"), 
mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2),
@@ -740,8 +737,8 @@ public class TargetAssignmentBuilderTest {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6, 
Collections.emptyMap());
-        Uuid barTopicId = context.addTopicMetadata("bar", 6, 
Collections.emptyMap());
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+        Uuid barTopicId = context.addTopicMetadata("bar", 6);
 
         context.addGroupMember("member-1", "instance-member-1", 
Arrays.asList("foo", "bar", "zar"), mkAssignment(
             mkTopicAssignment(fooTopicId, 1, 2),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
index fbec17e1d5c..00894138957 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
@@ -21,11 +21,9 @@ import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkListOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -35,29 +33,27 @@ public class TopicMetadataTest {
     @Test
     public void testAttributes() {
         Uuid topicId = Uuid.randomUuid();
-        Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
-        TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, 
partitionRacks);
+        TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
 
         assertEquals(topicId, topicMetadata.id());
         assertEquals("foo", topicMetadata.name());
         assertEquals(15, topicMetadata.numPartitions());
-        assertEquals(partitionRacks, topicMetadata.partitionRacks());
     }
 
     @Test
     public void testTopicIdAndNameCannotBeNull() {
-        assertThrows(NullPointerException.class, () -> new 
TopicMetadata(Uuid.randomUuid(), null, 15, Collections.emptyMap()));
-        assertThrows(NullPointerException.class, () -> new TopicMetadata(null, 
"foo", 15, Collections.emptyMap()));
+        assertThrows(NullPointerException.class, () -> new 
TopicMetadata(Uuid.randomUuid(), null, 15));
+        assertThrows(NullPointerException.class, () -> new TopicMetadata(null, 
"foo", 15));
     }
 
     @Test
     public void testEquals() {
         Uuid topicId = Uuid.randomUuid();
         Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
-        TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, 
partitionRacks);
+        TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
 
-        assertEquals(new TopicMetadata(topicId, "foo", 15, partitionRacks), 
topicMetadata);
-        assertNotEquals(new TopicMetadata(topicId, "foo", 5, 
mkMapOfPartitionRacks(5)), topicMetadata);
+        assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata);
+        assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata);
     }
 
     @Test
@@ -68,11 +64,10 @@ public class TopicMetadataTest {
         ConsumerGroupPartitionMetadataValue.TopicMetadata record = new 
ConsumerGroupPartitionMetadataValue.TopicMetadata()
             .setTopicId(topicId)
             .setTopicName(topicName)
-            .setNumPartitions(15)
-            .setPartitionMetadata(mkListOfPartitionRacks(15));
+            .setNumPartitions(15);
 
         assertEquals(
-            new TopicMetadata(topicId, topicName, 15, 
mkMapOfPartitionRacks(15)),
+            new TopicMetadata(topicId, topicName, 15),
             TopicMetadata.fromRecord(record)
         );
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index fa1e67f28c0..4c044323d06 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsImage;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -83,8 +82,7 @@ public class ConsumerGroupBuilder {
                         subscriptionMetadata.put(topicName, new TopicMetadata(
                             topicImage.id(),
                             topicImage.name(),
-                            topicImage.partitions().size(),
-                            Collections.emptyMap()
+                            topicImage.partitions().size()
                         ));
                     }
                 })
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 15d5e71bc37..d6eee7bbd9b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -58,7 +58,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -655,7 +654,7 @@ public class ConsumerGroupTest {
         // Compute while taking into account member 1.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(null, member1),
@@ -670,7 +669,7 @@ public class ConsumerGroupTest {
         // It should return foo now.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(null, null),
@@ -692,8 +691,8 @@ public class ConsumerGroupTest {
         // Compute while taking into account member 2.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(null, member2),
@@ -708,8 +707,8 @@ public class ConsumerGroupTest {
         // It should return foo and bar.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(null, null),
@@ -721,7 +720,7 @@ public class ConsumerGroupTest {
         // Compute while taking into account removal of member 2.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(member2, null),
@@ -733,7 +732,7 @@ public class ConsumerGroupTest {
         // Removing member1 results in returning bar.
         assertEquals(
             mkMap(
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(member1, null),
@@ -745,9 +744,9 @@ public class ConsumerGroupTest {
         // Compute while taking into account member 3.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(null, member3),
@@ -762,9 +761,9 @@ public class ConsumerGroupTest {
         // It should return foo, bar and zar.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(null, null),
@@ -786,7 +785,7 @@ public class ConsumerGroupTest {
         // Compute while taking into account removal of member 2 and member 3.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(new 
HashSet<>(Arrays.asList(member2, member3))),
@@ -798,8 +797,8 @@ public class ConsumerGroupTest {
         // Compute while taking into account removal of member 1.
         assertEquals(
             mkMap(
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 
consumerGroup.computeSubscribedTopicNames(Collections.singleton(member1)),
@@ -811,9 +810,9 @@ public class ConsumerGroupTest {
         // It should return foo, bar and zar.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 
consumerGroup.computeSubscribedTopicNames(Collections.emptySet()),
@@ -1220,8 +1219,8 @@ public class ConsumerGroupTest {
 
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             consumerGroup.computeSubscriptionMetadata(
                 consumerGroup.computeSubscribedTopicNames(null, null),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
index c8fe2fcf538..2ae29961434 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.image.TopicsImage;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -83,8 +82,7 @@ public class ShareGroupBuilder {
                         subscriptionMetadata.put(topicName, new TopicMetadata(
                             topicImage.id(),
                             topicImage.name(),
-                            topicImage.partitions().size(),
-                            Collections.emptyMap()
+                            topicImage.partitions().size()
                         ));
                     }
                 })
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index e53a9dac910..8dc7167d898 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -43,7 +43,6 @@ import java.util.HashSet;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -181,7 +180,7 @@ public class ShareGroupTest {
         // Compute while taking into account member 1.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(null, member1),
@@ -196,7 +195,7 @@ public class ShareGroupTest {
         // It should return foo now.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(null, null),
@@ -218,8 +217,8 @@ public class ShareGroupTest {
         // Compute while taking into account member 2.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(null, member2),
@@ -234,8 +233,8 @@ public class ShareGroupTest {
         // It should return foo and bar.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(null, null),
@@ -247,7 +246,7 @@ public class ShareGroupTest {
         // Compute while taking into account removal of member 2.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(member2, null),
@@ -259,7 +258,7 @@ public class ShareGroupTest {
         // Removing member1 results in returning bar.
         assertEquals(
             mkMap(
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(member1, null),
@@ -271,9 +270,9 @@ public class ShareGroupTest {
         // Compute while taking into account member 3.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(null, member3),
@@ -288,9 +287,9 @@ public class ShareGroupTest {
         // It should return foo, bar and zar.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(null, null),
@@ -312,7 +311,7 @@ public class ShareGroupTest {
         // Compute while taking into account removal of member 2 and member 3.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(new 
HashSet<>(Arrays.asList(member2, member3))),
@@ -324,8 +323,8 @@ public class ShareGroupTest {
         // Compute while taking into account removal of member 1.
         assertEquals(
             mkMap(
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             shareGroup.computeSubscriptionMetadata(
                 
shareGroup.computeSubscribedTopicNames(Collections.singleton(member1)),
@@ -337,9 +336,9 @@ public class ShareGroupTest {
         // It should return foo, bar and zar.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2))),
-                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, 
mkMapOfPartitionRacks(3)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+                mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(Collections.emptySet()),
@@ -644,8 +643,8 @@ public class ShareGroupTest {
 
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, 
mkMapOfPartitionRacks(1))),
-                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, 
mkMapOfPartitionRacks(2)))
+                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+                mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
             ),
             shareGroup.computeSubscriptionMetadata(
                 shareGroup.computeSubscribedTopicNames(null, null),
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index b9ce9bab9af..e3dceb19c2a 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -93,14 +93,11 @@ public class AssignorBenchmarkUtils {
      *
      * @param topicNames                The names of the topics.
      * @param partitionsPerTopic        The number of partitions per topic.
-     * @param getTopicPartitionRacks    A function to get the racks map for 
each topic. May return
-     *                                  an empty map if no rack info is 
desired.
      * @return The subscription metadata map.
      */
     public static Map<String, TopicMetadata> createSubscriptionMetadata(
         List<String> topicNames,
-        int partitionsPerTopic,
-        Function<String, Map<Integer, Set<String>>> getTopicPartitionRacks
+        int partitionsPerTopic
     ) {
         Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
 
@@ -110,8 +107,7 @@ public class AssignorBenchmarkUtils {
             TopicMetadata metadata = new TopicMetadata(
                 topicId,
                 topicName,
-                partitionsPerTopic,
-                getTopicPartitionRacks.apply(topicName)
+                partitionsPerTopic
             );
             subscriptionMetadata.put(topicName, metadata);
         }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index 82f11350381..a22673c3f59 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -148,13 +148,9 @@ public class ServerSideAssignorBenchmark {
         allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
 
         int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / 
topicCount;
-        Map<Integer, Set<String>> partitionRacks = isRackAware ?
-            mkMapOfPartitionRacks(partitionsPerTopic) :
-            Collections.emptyMap();
         subscriptionMetadata = 
AssignorBenchmarkUtils.createSubscriptionMetadata(
             allTopicNames,
-            partitionsPerTopic,
-            topicName -> partitionRacks
+            partitionsPerTopic
         );
 
         topicsImage = 
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index adf40671ebc..c9ddb54363b 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -122,8 +122,7 @@ public class TargetAssignmentBuilderBenchmark {
         int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / 
topicCount;
         subscriptionMetadata = 
AssignorBenchmarkUtils.createSubscriptionMetadata(
             allTopicNames,
-            partitionsPerTopic,
-            topicName -> Collections.emptyMap()
+            partitionsPerTopic
         );
 
         topicsImage = 
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);

Reply via email to