This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bb97d63d418 KAFKA-17578: Remove partitionRacks from TopicMetadata
(#17233)
bb97d63d418 is described below
commit bb97d63d418fe047cf5a59b16c7004e5011402da
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Sep 25 15:48:48 2024 +0800
KAFKA-17578: Remove partitionRacks from TopicMetadata (#17233)
The ModernGroup#subscribedTopicMetadata takes too much memory due to
partitionRacks. This is not being used at the moment as the consumer protocol
does not support rack aware assignments.
A heap dump from a group with 500 members, 2K subscribed topic partitions
shows 654,400 bytes used for partitionRacks. The rest of the ConsumerGroup
object holds 822,860 bytes.
Reviewers: David Jacot <[email protected]>
---
.../group/GroupCoordinatorRecordHelpers.java | 22 ---
.../coordinator/group/modern/ModernGroup.java | 20 +--
.../group/modern/SubscribedTopicDescriberImpl.java | 3 +-
.../coordinator/group/modern/TopicMetadata.java | 60 ++------
.../ConsumerGroupPartitionMetadataValue.json | 2 +-
.../group/GroupCoordinatorRecordHelpersTest.java | 46 ++-----
.../group/GroupMetadataManagerTest.java | 153 ++++++++++-----------
.../OptimizedUniformAssignmentBuilderTest.java | 49 +++----
.../group/assignor/RangeAssignorTest.java | 72 ++++------
.../group/assignor/SimpleAssignorTest.java | 27 ++--
.../UniformHeterogeneousAssignmentBuilderTest.java | 64 +++------
.../group/modern/SubscribedTopicMetadataTest.java | 35 +----
.../group/modern/TargetAssignmentBuilderTest.java | 35 +++--
.../group/modern/TopicMetadataTest.java | 21 ++-
.../modern/consumer/ConsumerGroupBuilder.java | 4 +-
.../group/modern/consumer/ConsumerGroupTest.java | 45 +++---
.../group/modern/share/ShareGroupBuilder.java | 4 +-
.../group/modern/share/ShareGroupTest.java | 45 +++---
.../kafka/jmh/assignor/AssignorBenchmarkUtils.java | 8 +-
.../jmh/assignor/ServerSideAssignorBenchmark.java | 6 +-
.../assignor/TargetAssignmentBuilderBenchmark.java | 3 +-
21 files changed, 238 insertions(+), 486 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index 3665fb05224..67364470b0f 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -139,21 +139,10 @@ public class GroupCoordinatorRecordHelpers {
) {
ConsumerGroupPartitionMetadataValue value = new
ConsumerGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
- List<ConsumerGroupPartitionMetadataValue.PartitionMetadata>
partitionMetadata = new ArrayList<>();
- // If the partition rack information map is empty, store an empty
list in the record.
- if (!topicMetadata.partitionRacks().isEmpty()) {
- topicMetadata.partitionRacks().forEach((partition, racks) ->
- partitionMetadata.add(new
ConsumerGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(partition)
- .setRacks(new ArrayList<>(racks))
- )
- );
- }
value.topics().add(new
ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
- .setPartitionMetadata(partitionMetadata)
);
});
@@ -657,21 +646,10 @@ public class GroupCoordinatorRecordHelpers {
) {
ShareGroupPartitionMetadataValue value = new
ShareGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
- List<ShareGroupPartitionMetadataValue.PartitionMetadata>
partitionMetadata = new ArrayList<>();
- // If the partition rack information map is empty, store an empty
list in the record.
- if (!topicMetadata.partitionRacks().isEmpty()) {
- topicMetadata.partitionRacks().forEach((partition, racks) ->
- partitionMetadata.add(new
ShareGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(partition)
- .setRacks(new ArrayList<>(racks))
- )
- );
- }
value.topics().add(new
ShareGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
- .setPartitionMetadata(partitionMetadata)
);
});
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
index 508b1c7d131..823792c198e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java
@@ -35,7 +35,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
@@ -389,26 +388,11 @@ public abstract class ModernGroup<T extends
ModernGroupMember> implements Group
subscribedTopicNames.forEach((topicName, count) -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
- Map<Integer, Set<String>> partitionRacks = new HashMap<>();
- topicImage.partitions().forEach((partition,
partitionRegistration) -> {
- Set<String> racks = new HashSet<>();
- for (int replica : partitionRegistration.replicas) {
- Optional<String> rackOptional =
clusterImage.broker(replica).rack();
- // Only add the rack if it is available for the
broker/replica.
- rackOptional.ifPresent(racks::add);
- }
- // If rack information is unavailable for all replicas of
this partition,
- // no corresponding entry will be stored for it in the map.
- if (!racks.isEmpty())
- partitionRacks.put(partition, racks);
- });
-
newSubscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
- topicImage.partitions().size(),
- partitionRacks)
- );
+ topicImage.partitions().size()
+ ));
}
});
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
index 81a03f97a82..7871b04d722 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java
@@ -72,8 +72,7 @@ public class SubscribedTopicDescriberImpl implements
SubscribedTopicDescriber {
*/
@Override
public Set<String> racksForPartition(Uuid topicId, int partition) {
- TopicMetadata topic = this.topicMetadata.get(topicId);
- return topic == null ? Collections.emptySet() :
topic.partitionRacks().getOrDefault(partition, Collections.emptySet());
+ return Collections.emptySet();
}
@Override
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
index 2eba447203b..ace670cc0e4 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TopicMetadata.java
@@ -20,12 +20,7 @@ import org.apache.kafka.common.Uuid;
import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import
org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Objects;
-import java.util.Set;
/**
* Immutable topic metadata.
@@ -46,17 +41,10 @@ public class TopicMetadata {
*/
private final int numPartitions;
- /**
- * Map of every partition Id to a set of its rack Ids, if they exist.
- * If rack information is unavailable for all partitions, this is an empty
map.
- */
- private final Map<Integer, Set<String>> partitionRacks;
-
public TopicMetadata(
Uuid id,
String name,
- int numPartitions,
- Map<Integer, Set<String>> partitionRacks
+ int numPartitions
) {
this.id = Objects.requireNonNull(id);
if (Uuid.ZERO_UUID.equals(id)) {
@@ -70,7 +58,6 @@ public class TopicMetadata {
if (numPartitions < 0) {
throw new IllegalArgumentException("Number of partitions cannot be
negative.");
}
- this.partitionRacks = Objects.requireNonNull(partitionRacks);
}
/**
@@ -94,14 +81,6 @@ public class TopicMetadata {
return this.numPartitions;
}
- /**
- * @return Every partition mapped to the set of corresponding available
rack Ids of its replicas.
- * An empty map is returned if rack information is unavailable for
all partitions.
- */
- public Map<Integer, Set<String>> partitionRacks() {
- return this.partitionRacks;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -111,8 +90,7 @@ public class TopicMetadata {
if (!id.equals(that.id)) return false;
if (!name.equals(that.name)) return false;
- if (numPartitions != that.numPartitions) return false;
- return partitionRacks.equals(that.partitionRacks);
+ return numPartitions == that.numPartitions;
}
@Override
@@ -120,7 +98,6 @@ public class TopicMetadata {
int result = id.hashCode();
result = 31 * result + name.hashCode();
result = 31 * result + numPartitions;
- result = 31 * result + partitionRacks.hashCode();
return result;
}
@@ -130,45 +107,26 @@ public class TopicMetadata {
"id=" + id +
", name=" + name +
", numPartitions=" + numPartitions +
- ", partitionRacks=" + partitionRacks +
')';
}
public static TopicMetadata fromRecord(
ConsumerGroupPartitionMetadataValue.TopicMetadata record
) {
- // Converting the data type from a list stored in the record to a map
for the topic metadata.
- Map<Integer, Set<String>> partitionRacks = new HashMap<>();
- for (ConsumerGroupPartitionMetadataValue.PartitionMetadata
partitionMetadata : record.partitionMetadata()) {
- partitionRacks.put(
- partitionMetadata.partition(),
- Collections.unmodifiableSet(new
HashSet<>(partitionMetadata.racks()))
- );
- }
-
return new TopicMetadata(
record.topicId(),
record.topicName(),
- record.numPartitions(),
- partitionRacks);
+ record.numPartitions()
+ );
}
public static TopicMetadata fromRecord(
- ShareGroupPartitionMetadataValue.TopicMetadata record
+ ShareGroupPartitionMetadataValue.TopicMetadata record
) {
- // Converting the data type from a list stored in the record to a map
for the topic metadata.
- Map<Integer, Set<String>> partitionRacks = new HashMap<>();
- for (ShareGroupPartitionMetadataValue.PartitionMetadata
partitionMetadata : record.partitionMetadata()) {
- partitionRacks.put(
- partitionMetadata.partition(),
- Collections.unmodifiableSet(new
HashSet<>(partitionMetadata.racks()))
- );
- }
-
return new TopicMetadata(
- record.topicId(),
- record.topicName(),
- record.numPartitions(),
- partitionRacks);
+ record.topicId(),
+ record.topicName(),
+ record.numPartitions()
+ );
}
}
diff --git
a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
index 4a17d6f685d..89be8cfa056 100644
---
a/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
+++
b/group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json
@@ -29,7 +29,7 @@
{ "name": "NumPartitions", "versions": "0+", "type": "int32",
"about": "The number of partitions of the topic." },
{ "name": "PartitionMetadata", "versions": "0+", "type":
"[]PartitionMetadata",
- "about": "Partitions mapped to a set of racks. If the rack information
is unavailable for all the partitions, an empty list is stored", "fields": [
+ "about": "Deprecated: this field is not used after 4.0. Partitions
mapped to a set of racks. If the rack information is unavailable for all the
partitions, an empty list is stored", "fields": [
{ "name": "Partition", "versions": "0+", "type": "int32",
"about": "The partition number." },
{ "name": "Racks", "versions": "0+", "type": "[]string",
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
index f71798a0759..38e2b9d68cf 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java
@@ -165,14 +165,12 @@ public class GroupCoordinatorRecordHelpersTest {
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
- 10,
- mkMapOfPartitionRacks(10)
+ 10
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
- 20,
- mkMapOfPartitionRacks(20)
+ 20
));
CoordinatorRecord expectedRecord = new CoordinatorRecord(
@@ -187,13 +185,11 @@ public class GroupCoordinatorRecordHelpersTest {
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
- .setNumPartitions(10)
- .setPartitionMetadata(mkListOfPartitionRacks(10)),
+ .setNumPartitions(10),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
- .setNumPartitions(20)
-
.setPartitionMetadata(mkListOfPartitionRacks(20)))),
+ .setNumPartitions(20))),
(short) 0));
assertRecordEquals(expectedRecord,
newConsumerGroupSubscriptionMetadataRecord(
@@ -226,14 +222,12 @@ public class GroupCoordinatorRecordHelpersTest {
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
- 10,
- Collections.emptyMap()
+ 10
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
- 20,
- Collections.emptyMap()
+ 20
));
CoordinatorRecord expectedRecord = new CoordinatorRecord(
@@ -248,13 +242,11 @@ public class GroupCoordinatorRecordHelpersTest {
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
- .setNumPartitions(10)
- .setPartitionMetadata(Collections.emptyList()),
+ .setNumPartitions(10),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
- .setNumPartitions(20)
- .setPartitionMetadata(Collections.emptyList()))),
+ .setNumPartitions(20))),
(short) 0));
assertRecordEquals(expectedRecord,
newConsumerGroupSubscriptionMetadataRecord(
@@ -821,28 +813,6 @@ public class GroupCoordinatorRecordHelpersTest {
assertEquals(expectedRecord, record);
}
- /**
- * Creates a list of values to be added to the record and assigns
partitions to racks for testing.
- *
- * @param numPartitions The number of partitions for the topic.
- *
- * For testing purposes, the following criteria are used:
- * - Number of replicas for each partition: 2
- * - Number of racks available to the cluster: 4
- */
- public static List<ConsumerGroupPartitionMetadataValue.PartitionMetadata>
mkListOfPartitionRacks(int numPartitions) {
- List<ConsumerGroupPartitionMetadataValue.PartitionMetadata>
partitionRacks = new ArrayList<>(numPartitions);
- for (int i = 0; i < numPartitions; i++) {
- List<String> racks = new ArrayList<>(Arrays.asList("rack" + i % 4,
"rack" + (i + 1) % 4));
- partitionRacks.add(
- new ConsumerGroupPartitionMetadataValue.PartitionMetadata()
- .setPartition(i)
- .setRacks(racks)
- );
- }
- return partitionRacks;
- }
-
/**
* Creates a map of partitions to racks for testing.
*
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4fbe8967ad7..2a56cd912cd 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -125,7 +125,6 @@ import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
@@ -501,8 +500,8 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
1),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
@@ -600,8 +599,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
@@ -725,12 +724,6 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember3),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
- {
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
- }
- }),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
@@ -748,9 +741,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3)
);
- assertRecordsEquals(expectedRecords.subList(0, 3),
result.records().subList(0, 3));
- assertUnorderedListEquals(expectedRecords.subList(3, 6),
result.records().subList(3, 6));
- assertRecordsEquals(expectedRecords.subList(6, 8),
result.records().subList(6, 8));
+ assertRecordsEquals(expectedRecords.subList(0, 2),
result.records().subList(0, 2));
+ assertUnorderedListEquals(expectedRecords.subList(2, 5),
result.records().subList(2, 5));
+ assertRecordsEquals(expectedRecords.subList(5, 7),
result.records().subList(5, 7));
}
@Test
@@ -837,8 +830,8 @@ public class GroupMetadataManagerTest {
// Subscription metadata is recomputed because zar is no longer
there.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11)
@@ -961,12 +954,6 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember3),
-
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
- {
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
- }
- }),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1),
@@ -984,9 +971,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3)
);
- assertRecordsEquals(expectedRecords.subList(0, 3),
result.records().subList(0, 3));
- assertUnorderedListEquals(expectedRecords.subList(3, 6),
result.records().subList(3, 6));
- assertRecordsEquals(expectedRecords.subList(6, 8),
result.records().subList(6, 8));
+ assertRecordsEquals(expectedRecords.subList(0, 2),
result.records().subList(0, 2));
+ assertUnorderedListEquals(expectedRecords.subList(2, 5),
result.records().subList(2, 5));
+ assertRecordsEquals(expectedRecords.subList(5, 7),
result.records().subList(5, 7));
}
@Test
@@ -1052,8 +1039,8 @@ public class GroupMetadataManagerTest {
.withAssignmentEpoch(10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}
}))
.build();
@@ -1225,7 +1212,7 @@ public class GroupMetadataManagerTest {
.withAssignmentEpoch(10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
}
}))
.build();
@@ -1343,8 +1330,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedRejoinedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
@@ -1538,8 +1525,8 @@ public class GroupMetadataManagerTest {
// Subscription metadata is recomputed because zar is no longer
there.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11)
@@ -2597,7 +2584,7 @@ public class GroupMetadataManagerTest {
{
// foo only has 3 partitions stored in the metadata
but foo has
// 6 partitions the metadata image.
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 3));
}
}))
.build();
@@ -2651,7 +2638,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
@@ -2708,7 +2695,7 @@ public class GroupMetadataManagerTest {
{
// foo only has 3 partitions stored in the metadata
but foo has
// 6 partitions the metadata image.
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 3));
}
}))
.build();
@@ -2780,7 +2767,7 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = Arrays.asList(
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
@@ -9845,8 +9832,8 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 1, mkMapOfPartitionRacks(1)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 1));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
}
}),
@@ -10078,8 +10065,8 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
}
}),
@@ -10241,7 +10228,7 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 1));
}
}),
@@ -10345,8 +10332,8 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3));
}
}));
@@ -10632,8 +10619,8 @@ public class GroupMetadataManagerTest {
// The subscription metadata hasn't been updated during the
conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
}
}),
@@ -10846,8 +10833,8 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3));
}
}));
@@ -11034,8 +11021,8 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3));
}
}));
@@ -11221,9 +11208,9 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3, mkMapOfPartitionRacks(3)));
- put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName,
1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
6));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
3));
+ put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName,
1));
}
}));
@@ -11428,7 +11415,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String,
TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11514,8 +11501,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11),
@@ -11577,7 +11564,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11633,7 +11620,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11684,8 +11671,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
@@ -11728,7 +11715,7 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId)
@@ -11850,9 +11837,9 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
- put(zarTopicName, new TopicMetadata(zarTopicId,
zarTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
+ put(zarTopicName, new TopicMetadata(zarTopicId,
zarTopicName, 1));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId1)
@@ -12080,8 +12067,8 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId1)
@@ -12182,9 +12169,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
- put(zarTopicName, new TopicMetadata(zarTopicId,
zarTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
+ put(zarTopicName, new TopicMetadata(zarTopicId,
zarTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
@@ -12322,8 +12309,8 @@ public class GroupMetadataManagerTest {
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>()
{
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
}
})
.withMember(new ConsumerGroupMember.Builder(memberId1)
@@ -12424,9 +12411,9 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1, mkMapOfPartitionRacks(1)));
- put(zarTopicName, new TopicMetadata(zarTopicId,
zarTopicName, 1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 1));
+ put(zarTopicName, new TopicMetadata(zarTopicId,
zarTopicName, 1));
}
}),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11),
@@ -13422,8 +13409,8 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
1));
}
}));
@@ -13594,8 +13581,8 @@ public class GroupMetadataManagerTest {
context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE,
10);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2, mkMapOfPartitionRacks(2)));
- put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
1, mkMapOfPartitionRacks(1)));
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName,
2));
+ put(barTopicName, new TopicMetadata(barTopicId, barTopicName,
1));
}
}));
@@ -13627,7 +13614,7 @@ public class GroupMetadataManagerTest {
// Update the subscription metadata.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2, mkMapOfPartitionRacks(2)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 2));
}
}),
// Bump the group epoch.
@@ -13988,8 +13975,8 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId,
expectedMember),
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 1),
@@ -14082,8 +14069,8 @@ public class GroupMetadataManagerTest {
// Subscription metadata is recomputed because zar is no
longer there.
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId,
new HashMap<String, TopicMetadata>() {
{
- put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6, mkMapOfPartitionRacks(6)));
- put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3, mkMapOfPartitionRacks(3)));
+ put(fooTopicName, new TopicMetadata(fooTopicId,
fooTopicName, 6));
+ put(barTopicName, new TopicMetadata(barTopicId,
barTopicName, 3));
}
}),
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
@@ -14348,7 +14335,7 @@ public class GroupMetadataManagerTest {
Map<String, TopicMetadata> metadata = Collections.singletonMap(
"bar",
- new TopicMetadata(Uuid.randomUuid(), "bar", 10,
Collections.emptyMap())
+ new TopicMetadata(Uuid.randomUuid(), "bar", 10)
);
// The group is created if it does not exist.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
index 295794c36ad..a0d524ab839 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
@@ -44,7 +44,6 @@ import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTarg
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -70,8 +69,7 @@ public class OptimizedUniformAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
)
)
);
@@ -108,8 +106,7 @@ public class OptimizedUniformAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
)
)
);
@@ -140,14 +137,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 2,
- mkMapOfPartitionRacks(2)
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -197,8 +192,7 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 2,
- mkMapOfPartitionRacks(2)
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -260,8 +254,7 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topicId, new TopicMetadata(
topicId,
"topic-" + i,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
}
@@ -296,14 +289,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -361,14 +352,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 6,
- mkMapOfPartitionRacks(6)
+ 6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 5,
- mkMapOfPartitionRacks(5)
+ 5
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -425,14 +414,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -499,14 +486,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -565,14 +550,12 @@ public class OptimizedUniformAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 2,
- mkMapOfPartitionRacks(2)
+ 2
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 2,
- mkMapOfPartitionRacks(2)
+ 2
));
// Initial subscriptions were [T1, T2]
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
index 64b946f906e..4c4321cfb2e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java
@@ -68,8 +68,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
)
)
);
@@ -111,8 +110,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
)
)
);
@@ -143,14 +141,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 2,
- Collections.emptyMap()
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -200,20 +196,17 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 2,
- Collections.emptyMap()
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -273,14 +266,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 2,
- Collections.emptyMap()
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -343,8 +334,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
)
)
);
@@ -414,8 +404,7 @@ public class RangeAssignorTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 5,
- Collections.emptyMap()
+ 5
)
)
);
@@ -497,14 +486,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 2,
- Collections.emptyMap()
+ 2
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 2,
- Collections.emptyMap()
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -571,14 +558,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 4,
- Collections.emptyMap()
+ 4
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 4,
- Collections.emptyMap()
+ 4
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -634,14 +619,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- Collections.emptyMap()
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -710,14 +693,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 4,
- Collections.emptyMap()
+ 4
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- Collections.emptyMap()
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -784,14 +765,12 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- Collections.emptyMap()
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -835,20 +814,17 @@ public class RangeAssignorTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 2,
- Collections.emptyMap()
+ 2
));
// Let initial subscriptions be A -> T1, T2 // B -> T2 // C -> T2, T3
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index 0553db6a487..4d0fe8cdf7b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -88,8 +88,7 @@ public class SimpleAssignorTest {
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
- 3,
- Collections.emptyMap()
+ 3
)
)
);
@@ -126,8 +125,7 @@ public class SimpleAssignorTest {
new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
- 3,
- Collections.emptyMap()
+ 3
)
)
);
@@ -158,14 +156,12 @@ public class SimpleAssignorTest {
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
- 2,
- Collections.emptyMap()
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -215,21 +211,18 @@ public class SimpleAssignorTest {
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(TOPIC_3_UUID, new TopicMetadata(
TOPIC_3_UUID,
TOPIC_3_NAME,
- 2,
- Collections.emptyMap()
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -289,15 +282,13 @@ public class SimpleAssignorTest {
topicMetadata.put(TOPIC_1_UUID, new TopicMetadata(
TOPIC_1_UUID,
TOPIC_1_NAME,
- 3,
- Collections.emptyMap()
+ 3
));
topicMetadata.put(TOPIC_2_UUID, new TopicMetadata(
TOPIC_2_UUID,
"topic2",
- 2,
- Collections.emptyMap()
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
index 0ced6b2f63d..93be24076f1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilderTest.java
@@ -40,7 +40,6 @@ import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssign
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -67,8 +66,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
)
)
);
@@ -109,8 +107,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
)
)
);
@@ -146,14 +143,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 6,
- mkMapOfPartitionRacks(6)
+ 6
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -202,14 +197,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 1,
- mkMapOfPartitionRacks(1)
+ 1
));
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 2,
- mkMapOfPartitionRacks(2)
+ 2
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -268,20 +261,17 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 6,
- mkMapOfPartitionRacks(6)
+ 6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 4,
- mkMapOfPartitionRacks(4)
+ 4
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 4,
- mkMapOfPartitionRacks(4)
+ 4
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -350,26 +340,22 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 6,
- mkMapOfPartitionRacks(6)
+ 6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 5,
- mkMapOfPartitionRacks(5)
+ 5
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic4Uuid, new TopicMetadata(
topic4Uuid,
topic4Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -426,14 +412,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 6,
- mkMapOfPartitionRacks(6)
+ 6
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 7,
- mkMapOfPartitionRacks(7)
+ 7
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -499,20 +483,17 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 8,
- mkMapOfPartitionRacks(4)
+ 8
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
@@ -568,14 +549,12 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
- 5,
- mkMapOfPartitionRacks(5)
+ 5
));
// Initial subscriptions were [T1, T2]
@@ -630,8 +609,7 @@ public class UniformHeterogeneousAssignmentBuilderTest {
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
- 3,
- mkMapOfPartitionRacks(3)
+ 3
));
Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
index c90aaf02bcb..0842cee241d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java
@@ -21,12 +21,9 @@ import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -42,10 +39,9 @@ public class SubscribedTopicMetadataTest {
for (int i = 0; i < 5; i++) {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic" + i;
- Map<Integer, Set<String>> partitionRacks =
mkMapOfPartitionRacks(5);
topicMetadataMap.put(
topicId,
- new TopicMetadata(topicId, topicName, 5, partitionRacks)
+ new TopicMetadata(topicId, topicName, 5)
);
}
subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(topicMetadataMap);
@@ -68,44 +64,19 @@ public class SubscribedTopicMetadataTest {
// Test -1 is returned when the topic Id doesn't exist.
assertEquals(-1, subscribedTopicMetadata.numPartitions(topicId));
- topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3,
Collections.emptyMap()));
+ topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3));
// Test that the correct number of partitions are returned for a given
topic Id.
assertEquals(3, subscribedTopicMetadata.numPartitions(topicId));
}
- @Test
- public void testRacksForPartition() {
- Uuid topicId = Uuid.randomUuid();
-
- // Test that an empty set is returned for a non-existent topic Id.
- assertEquals(Collections.emptySet(),
subscribedTopicMetadata.racksForPartition(topicId, 0));
-
- // Add topic Id with partition racks included.
- Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(3);
- topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3,
partitionRacks));
-
- // Test that an empty set is returned for a non-existent partition Id.
- assertEquals(Collections.emptySet(),
subscribedTopicMetadata.racksForPartition(topicId, 4));
-
- // Test that a correct set of racks is returned for the given topic Id
and partition Id.
- assertEquals(partitionRacks.get(2),
subscribedTopicMetadata.racksForPartition(topicId, 2));
-
- // Add another topic Id without partition racks.
- topicId = Uuid.randomUuid();
- topicMetadataMap.put(topicId, new TopicMetadata(topicId, "topic6", 3,
Collections.emptyMap()));
-
- // Test that an empty set is returned when the partition rack info is
absent.
- assertEquals(Collections.emptySet(),
subscribedTopicMetadata.racksForPartition(topicId, 1));
- }
-
@Test
public void testEquals() {
assertEquals(new SubscribedTopicDescriberImpl(topicMetadataMap),
subscribedTopicMetadata);
Map<Uuid, TopicMetadata> topicMetadataMap2 = new HashMap<>();
Uuid topicId = Uuid.randomUuid();
- topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic",
5, Collections.emptyMap()));
+ topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic",
5));
assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2),
subscribedTopicMetadata);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
index b4f25e00f2e..093145650b9 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilderTest.java
@@ -43,7 +43,6 @@ import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static
org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder.createMemberSubscriptionAndAssignment;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -102,15 +101,13 @@ public class TargetAssignmentBuilderTest {
public Uuid addTopicMetadata(
String topicName,
- int numPartitions,
- Map<Integer, Set<String>> partitionRacks
+ int numPartitions
) {
Uuid topicId = Uuid.randomUuid();
subscriptionMetadata.put(topicName, new TopicMetadata(
topicId,
topicName,
- numPartitions,
- partitionRacks
+ numPartitions
));
topicsImageBuilder = topicsImageBuilder.addTopic(topicId,
topicName, numPartitions);
@@ -314,8 +311,8 @@ public class TargetAssignmentBuilderTest {
20
);
- Uuid fooTopicId = context.addTopicMetadata("foo", 6,
Collections.emptyMap());
- Uuid barTopicId = context.addTopicMetadata("bar", 6,
Collections.emptyMap());
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"),
mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -364,8 +361,8 @@ public class TargetAssignmentBuilderTest {
20
);
- Uuid fooTopicId = context.addTopicMetadata("foo", 6,
Collections.emptyMap());
- Uuid barTopicId = context.addTopicMetadata("bar", 6,
Collections.emptyMap());
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"),
mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -427,8 +424,8 @@ public class TargetAssignmentBuilderTest {
20
);
- Uuid fooTopicId = context.addTopicMetadata("foo", 6,
Collections.emptyMap());
- Uuid barTopicId = context.addTopicMetadata("bar", 6,
Collections.emptyMap());
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"),
mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -505,8 +502,8 @@ public class TargetAssignmentBuilderTest {
20
);
- Uuid fooTopicId = context.addTopicMetadata("foo", 6,
Collections.emptyMap());
- Uuid barTopicId = context.addTopicMetadata("bar", 6,
Collections.emptyMap());
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"),
mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
@@ -592,8 +589,8 @@ public class TargetAssignmentBuilderTest {
20
);
- Uuid fooTopicId = context.addTopicMetadata("foo", 6,
mkMapOfPartitionRacks(6));
- Uuid barTopicId = context.addTopicMetadata("bar", 6,
mkMapOfPartitionRacks(6));
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"),
mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
@@ -670,8 +667,8 @@ public class TargetAssignmentBuilderTest {
20
);
- Uuid fooTopicId = context.addTopicMetadata("foo", 6,
Collections.emptyMap());
- Uuid barTopicId = context.addTopicMetadata("bar", 6,
Collections.emptyMap());
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", Arrays.asList("foo", "bar", "zar"),
mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
@@ -740,8 +737,8 @@ public class TargetAssignmentBuilderTest {
20
);
- Uuid fooTopicId = context.addTopicMetadata("foo", 6,
Collections.emptyMap());
- Uuid barTopicId = context.addTopicMetadata("bar", 6,
Collections.emptyMap());
+ Uuid fooTopicId = context.addTopicMetadata("foo", 6);
+ Uuid barTopicId = context.addTopicMetadata("bar", 6);
context.addGroupMember("member-1", "instance-member-1",
Arrays.asList("foo", "bar", "zar"), mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
index fbec17e1d5c..00894138957 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java
@@ -21,11 +21,9 @@ import
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada
import org.junit.jupiter.api.Test;
-import java.util.Collections;
import java.util.Map;
import java.util.Set;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkListOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -35,29 +33,27 @@ public class TopicMetadataTest {
@Test
public void testAttributes() {
Uuid topicId = Uuid.randomUuid();
- Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
- TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15,
partitionRacks);
+ TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
assertEquals(topicId, topicMetadata.id());
assertEquals("foo", topicMetadata.name());
assertEquals(15, topicMetadata.numPartitions());
- assertEquals(partitionRacks, topicMetadata.partitionRacks());
}
@Test
public void testTopicIdAndNameCannotBeNull() {
- assertThrows(NullPointerException.class, () -> new
TopicMetadata(Uuid.randomUuid(), null, 15, Collections.emptyMap()));
- assertThrows(NullPointerException.class, () -> new TopicMetadata(null,
"foo", 15, Collections.emptyMap()));
+ assertThrows(NullPointerException.class, () -> new
TopicMetadata(Uuid.randomUuid(), null, 15));
+ assertThrows(NullPointerException.class, () -> new TopicMetadata(null,
"foo", 15));
}
@Test
public void testEquals() {
Uuid topicId = Uuid.randomUuid();
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
- TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15,
partitionRacks);
+ TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
- assertEquals(new TopicMetadata(topicId, "foo", 15, partitionRacks),
topicMetadata);
- assertNotEquals(new TopicMetadata(topicId, "foo", 5,
mkMapOfPartitionRacks(5)), topicMetadata);
+ assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata);
+ assertNotEquals(new TopicMetadata(topicId, "foo", 5), topicMetadata);
}
@Test
@@ -68,11 +64,10 @@ public class TopicMetadataTest {
ConsumerGroupPartitionMetadataValue.TopicMetadata record = new
ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicId)
.setTopicName(topicName)
- .setNumPartitions(15)
- .setPartitionMetadata(mkListOfPartitionRacks(15));
+ .setNumPartitions(15);
assertEquals(
- new TopicMetadata(topicId, topicName, 15,
mkMapOfPartitionRacks(15)),
+ new TopicMetadata(topicId, topicName, 15),
TopicMetadata.fromRecord(record)
);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
index fa1e67f28c0..4c044323d06 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -83,8 +82,7 @@ public class ConsumerGroupBuilder {
subscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
- topicImage.partitions().size(),
- Collections.emptyMap()
+ topicImage.partitions().size()
));
}
})
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index 15d5e71bc37..d6eee7bbd9b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -58,7 +58,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -655,7 +654,7 @@ public class ConsumerGroupTest {
// Compute while taking into account member 1.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, member1),
@@ -670,7 +669,7 @@ public class ConsumerGroupTest {
// It should return foo now.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),
@@ -692,8 +691,8 @@ public class ConsumerGroupTest {
// Compute while taking into account member 2.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, member2),
@@ -708,8 +707,8 @@ public class ConsumerGroupTest {
// It should return foo and bar.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),
@@ -721,7 +720,7 @@ public class ConsumerGroupTest {
// Compute while taking into account removal of member 2.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(member2, null),
@@ -733,7 +732,7 @@ public class ConsumerGroupTest {
// Removing member1 results in returning bar.
assertEquals(
mkMap(
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(member1, null),
@@ -745,9 +744,9 @@ public class ConsumerGroupTest {
// Compute while taking into account member 3.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, member3),
@@ -762,9 +761,9 @@ public class ConsumerGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),
@@ -786,7 +785,7 @@ public class ConsumerGroupTest {
// Compute while taking into account removal of member 2 and member 3.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(new
HashSet<>(Arrays.asList(member2, member3))),
@@ -798,8 +797,8 @@ public class ConsumerGroupTest {
// Compute while taking into account removal of member 1.
assertEquals(
mkMap(
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(Collections.singleton(member1)),
@@ -811,9 +810,9 @@ public class ConsumerGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(Collections.emptySet()),
@@ -1220,8 +1219,8 @@ public class ConsumerGroupTest {
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
consumerGroup.computeSubscriptionMetadata(
consumerGroup.computeSubscribedTopicNames(null, null),
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
index c8fe2fcf538..2ae29961434 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -83,8 +82,7 @@ public class ShareGroupBuilder {
subscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
- topicImage.partitions().size(),
- Collections.emptyMap()
+ topicImage.partitions().size()
));
}
})
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index e53a9dac910..8dc7167d898 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -43,7 +43,6 @@ import java.util.HashSet;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -181,7 +180,7 @@ public class ShareGroupTest {
// Compute while taking into account member 1.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member1),
@@ -196,7 +195,7 @@ public class ShareGroupTest {
// It should return foo now.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
@@ -218,8 +217,8 @@ public class ShareGroupTest {
// Compute while taking into account member 2.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member2),
@@ -234,8 +233,8 @@ public class ShareGroupTest {
// It should return foo and bar.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
@@ -247,7 +246,7 @@ public class ShareGroupTest {
// Compute while taking into account removal of member 2.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(member2, null),
@@ -259,7 +258,7 @@ public class ShareGroupTest {
// Removing member1 results in returning bar.
assertEquals(
mkMap(
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(member1, null),
@@ -271,9 +270,9 @@ public class ShareGroupTest {
// Compute while taking into account member 3.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, member3),
@@ -288,9 +287,9 @@ public class ShareGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
@@ -312,7 +311,7 @@ public class ShareGroupTest {
// Compute while taking into account removal of member 2 and member 3.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(new
HashSet<>(Arrays.asList(member2, member3))),
@@ -324,8 +323,8 @@ public class ShareGroupTest {
// Compute while taking into account removal of member 1.
assertEquals(
mkMap(
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(Collections.singleton(member1)),
@@ -337,9 +336,9 @@ public class ShareGroupTest {
// It should return foo, bar and zar.
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2))),
- mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3,
mkMapOfPartitionRacks(3)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2)),
+ mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(Collections.emptySet()),
@@ -644,8 +643,8 @@ public class ShareGroupTest {
assertEquals(
mkMap(
- mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1,
mkMapOfPartitionRacks(1))),
- mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2,
mkMapOfPartitionRacks(2)))
+ mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1)),
+ mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2))
),
shareGroup.computeSubscriptionMetadata(
shareGroup.computeSubscribedTopicNames(null, null),
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
index b9ce9bab9af..e3dceb19c2a 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java
@@ -93,14 +93,11 @@ public class AssignorBenchmarkUtils {
*
* @param topicNames The names of the topics.
* @param partitionsPerTopic The number of partitions per topic.
- * @param getTopicPartitionRacks A function to get the racks map for
each topic. May return
- * an empty map if no rack info is
desired.
* @return The subscription metadata map.
*/
public static Map<String, TopicMetadata> createSubscriptionMetadata(
List<String> topicNames,
- int partitionsPerTopic,
- Function<String, Map<Integer, Set<String>>> getTopicPartitionRacks
+ int partitionsPerTopic
) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
@@ -110,8 +107,7 @@ public class AssignorBenchmarkUtils {
TopicMetadata metadata = new TopicMetadata(
topicId,
topicName,
- partitionsPerTopic,
- getTopicPartitionRacks.apply(topicName)
+ partitionsPerTopic
);
subscriptionMetadata.put(topicName, metadata);
}
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index 82f11350381..a22673c3f59 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -148,13 +148,9 @@ public class ServerSideAssignorBenchmark {
allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
int partitionsPerTopic = (memberCount * partitionsToMemberRatio) /
topicCount;
- Map<Integer, Set<String>> partitionRacks = isRackAware ?
- mkMapOfPartitionRacks(partitionsPerTopic) :
- Collections.emptyMap();
subscriptionMetadata =
AssignorBenchmarkUtils.createSubscriptionMetadata(
allTopicNames,
- partitionsPerTopic,
- topicName -> partitionRacks
+ partitionsPerTopic
);
topicsImage =
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index adf40671ebc..c9ddb54363b 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -122,8 +122,7 @@ public class TargetAssignmentBuilderBenchmark {
int partitionsPerTopic = (memberCount * partitionsToMemberRatio) /
topicCount;
subscriptionMetadata =
AssignorBenchmarkUtils.createSubscriptionMetadata(
allTopicNames,
- partitionsPerTopic,
- topicName -> Collections.emptyMap()
+ partitionsPerTopic
);
topicsImage =
AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);