This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4b5a16bf6ff KAFKA-18757: Create full-function SimpleAssignor to match
KIP-932 description (#18864)
4b5a16bf6ff is described below
commit 4b5a16bf6ffcb4322d7c5b35a484ff914397909c
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Feb 26 16:32:23 2025 +0530
KAFKA-18757: Create full-function SimpleAssignor to match KIP-932
description (#18864)
### About
The current `SimpleAssignor` in AK assigned all subscribed topic
partitions to all the share group members. This does not match the
description given in
[KIP-932](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-TheSimpleAssignor).
Here are the rules as mentioned in the KIP by which the assignment
should happen. We have changed the step 3 implementation here due to the
reasons
[described](https://github.com/apache/kafka/pull/18864#issuecomment-2659266502)
-
1. The assignor hashes the member IDs of the members and maps the
partitions assigned to the members based on the hash. This gives
approximately even balance.
2. If any partitions were not assigned any members by (1) and do not
have members already assigned in the current assignment, members are
assigned round-robin until each partition has at least one member
assigned to it.
3. We combine the current and new assignment. (Original rule - If any
partitions were assigned members by (1) and also have members in the
current assignment assigned by (2), the members assigned by (2) are
removed.)
### Tests
The added code has been verified with unit tests and the already present
integration tests.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>, TaiJuWu <[email protected]>
---
.../server/ShareGroupHeartbeatRequestTest.scala | 27 +-
.../coordinator/group/assignor/SimpleAssignor.java | 242 +++++++++-
.../group/assignor/SimpleAssignorTest.java | 512 +++++++++++++++++++--
3 files changed, 711 insertions(+), 70 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 07c7b959ab8..448b6897ede 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.test.ClusterInstance
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals,
assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.{Tag, Timeout}
+import java.util
import scala.jdk.CollectionConverters._
@Timeout(120)
@@ -216,18 +217,12 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertNotEquals(memberId1, memberId2)
// Create the topic.
- val topicId = TestUtils.createTopicWithAdminRaw(
+ TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = "foo",
numPartitions = 3
)
- // This is the expected assignment.
- val expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(List(new
ShareGroupHeartbeatResponseData.TopicPartitions()
- .setTopicId(topicId)
- .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
-
// Prepare the next heartbeat for member 1.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
@@ -241,10 +236,10 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
- shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
- shareGroupHeartbeatResponse.data.assignment == expectedAssignment
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
shareGroupHeartbeatResponse.data.assignment != null
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
+ val topicPartitionsAssignedToMember1 =
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
@@ -261,13 +256,23 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
- shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
- shareGroupHeartbeatResponse.data.assignment == expectedAssignment
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
shareGroupHeartbeatResponse.data.assignment != null
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
+ val topicPartitionsAssignedToMember2 =
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
+ topicPartitionsAssignedToMember1.forEach(topicPartition => {
+ partitionsAssigned.addAll(topicPartition.partitions())
+ })
+ topicPartitionsAssignedToMember2.forEach(topicPartition => {
+ partitionsAssigned.addAll(topicPartition.partitions())
+ })
+ // Verify all the 3 topic partitions for "foo" have been assigned to at
least 1 member.
+ assertEquals(util.Set.of(0, 1, 2), partitionsAssigned)
+
// Verify the assignments are not changed for member 1.
// Prepare another heartbeat for member 1 with latest received epoch 3
for member 1.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
index 4cc66468ae7..781b64dd604 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
@@ -25,19 +25,23 @@ import
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorExceptio
import
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+import org.apache.kafka.server.common.TopicIdPartition;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
/**
- * A simple partition assignor that assigns each member all partitions of the
subscribed topics.
+ * A simple partition assignor that assigns partitions of the subscribed
topics based on the rules defined in KIP-932 to different members.
*/
public class SimpleAssignor implements ShareGroupPartitionAssignor {
@@ -54,7 +58,7 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
if (groupSpec.memberIds().isEmpty())
- return new GroupAssignment(Collections.emptyMap());
+ return new GroupAssignment(Map.of());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
return assignHomogenous(groupSpec, subscribedTopicDescriber);
@@ -67,42 +71,240 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) {
- Set<Uuid> subscribeTopicIds =
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
+ Set<Uuid> subscribedTopicIds =
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
.subscribedTopicIds();
- if (subscribeTopicIds.isEmpty())
- return new GroupAssignment(Collections.emptyMap());
+ if (subscribedTopicIds.isEmpty())
+ return new GroupAssignment(Map.of());
- Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions(
- subscribeTopicIds, subscribedTopicDescriber);
+ // Subscribed topic partitions for the share group.
+ List<TopicIdPartition> targetPartitions = computeTargetPartitions(
+ subscribedTopicIds, subscribedTopicDescriber);
- return new
GroupAssignment(groupSpec.memberIds().stream().collect(Collectors.toMap(
- Function.identity(), memberId -> new
MemberAssignmentImpl(targetPartitions))));
+ // The current assignment from topic partition to members.
+ Map<TopicIdPartition, List<String>> currentAssignment =
currentAssignment(groupSpec);
+ return newAssignmentHomogeneous(groupSpec, subscribedTopicIds,
targetPartitions, currentAssignment);
}
private GroupAssignment assignHeterogeneous(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) {
- Map<String, MemberAssignment> members = new HashMap<>();
+ Map<String, List<TopicIdPartition>> memberToPartitionsSubscription =
new HashMap<>();
for (String memberId : groupSpec.memberIds()) {
MemberSubscription spec = groupSpec.memberSubscription(memberId);
if (spec.subscribedTopicIds().isEmpty())
continue;
- Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions(
+ // Subscribed topic partitions for the share group member.
+ List<TopicIdPartition> targetPartitions = computeTargetPartitions(
spec.subscribedTopicIds(), subscribedTopicDescriber);
+ memberToPartitionsSubscription.put(memberId, targetPartitions);
+ }
+
+ // The current assignment from topic partition to members.
+ Map<TopicIdPartition, List<String>> currentAssignment =
currentAssignment(groupSpec);
+ return newAssignmentHeterogeneous(groupSpec,
memberToPartitionsSubscription, currentAssignment);
+ }
+
+ /**
+ * Get the current assignment by topic partitions.
+ * @param groupSpec - The group metadata specifications.
+ * @return the current assignment for subscribed topic partitions to
memberIds.
+ */
+ private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec
groupSpec) {
+ Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
- members.put(memberId, new MemberAssignmentImpl(targetPartitions));
+ for (String member : groupSpec.memberIds()) {
+ Map<Uuid, Set<Integer>> assignedTopicPartitions =
groupSpec.memberAssignment(member).partitions();
+ assignedTopicPartitions.forEach((topicId, partitions) ->
partitions.forEach(
+ partition -> assignment.computeIfAbsent(new
TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member)));
}
+ return assignment;
+ }
+
+ /**
+ * This function computes the new assignment for a homogeneous group.
+ * @param groupSpec - The group metadata specifications.
+ * @param subscribedTopicIds - The set of all the subscribed topic ids for
the group.
+ * @param targetPartitions - The list of all topic partitions that need
assignment.
+ * @param currentAssignment - The current assignment for subscribed topic
partitions to memberIds.
+ * @return the new partition assignment for the members of the group.
+ */
+ private GroupAssignment newAssignmentHomogeneous(
+ GroupSpec groupSpec,
+ Set<Uuid> subscribedTopicIds,
+ List<TopicIdPartition> targetPartitions,
+ Map<TopicIdPartition, List<String>> currentAssignment
+ ) {
+ Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
+
+ // Step 1: Hash member IDs to topic partitions.
+ memberHashAssignment(targetPartitions, groupSpec.memberIds(),
newAssignment);
+
+ // Step 2: Round-robin assignment for unassigned partitions which do
not have members already assigned in the current assignment.
+ List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
+ .filter(targetPartition ->
!newAssignment.containsKey(targetPartition))
+ .filter(targetPartition ->
!currentAssignment.containsKey(targetPartition))
+ .toList();
+
+ roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions,
newAssignment);
+
+ // Step 3: We combine current assignment and new assignment.
+ Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
+
+ // As per the KIP, we should revoke the assignments from current
assignment for partitions that were assigned by step 1
+ // in the new assignment and have members in current assignment by
step 2. But we haven't implemented it to avoid the
+ // complexity in both the implementation and the run time complexity.
This step was mentioned in the KIP to reduce
+ // the burden of certain members of the share groups. This can be
achieved with the help of limiting the max
+ // no. of partitions assignment for every member(KAFKA-18788). Hence,
the potential problem of burdening
+ // the share consumers will be addressed in a future PR.
+
+ newAssignment.forEach((targetPartition, members) ->
members.forEach(member ->
+ finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition)));
+ // When combining current assignment, we need to only consider the
topics in current assignment that are also being
+ // subscribed in the new assignment as well.
+ currentAssignment.forEach((targetPartition, members) -> {
+ if (subscribedTopicIds.contains(targetPartition.topicId()))
+ members.forEach(member -> {
+ if (groupSpec.memberIds().contains(member) &&
!newAssignment.containsKey(targetPartition))
+ finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition);
+ });
+ });
+
+ return groupAssignment(finalAssignment, groupSpec.memberIds());
+ }
+
+ /**
+ * This function computes the new assignment for a heterogeneous group.
+ * @param groupSpec - The group metadata specifications.
+ * @param memberToPartitionsSubscription - The member to subscribed topic
partitions map.
+ * @param currentAssignment - The current assignment for subscribed topic
partitions to memberIds.
+ * @return the new partition assignment for the members of the group.
+ */
+ private GroupAssignment newAssignmentHeterogeneous(
+ GroupSpec groupSpec,
+ Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
+ Map<TopicIdPartition, List<String>> currentAssignment
+ ) {
+
+ // Exhaustive set of all subscribed topic partitions.
+ Set<TopicIdPartition> targetPartitions = new LinkedHashSet<>();
+
memberToPartitionsSubscription.values().forEach(targetPartitions::addAll);
+
+ // Create a map for topic to members subscription.
+ Map<Uuid, Set<String>> topicToMemberSubscription = new HashMap<>();
+ memberToPartitionsSubscription.forEach((member, partitions) ->
+ partitions.forEach(partition ->
topicToMemberSubscription.computeIfAbsent(partition.topicId(), k -> new
LinkedHashSet<>()).add(member)));
+
+ Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
+
+ // Step 1: Hash member IDs to partitions.
+ memberToPartitionsSubscription.forEach((member, partitions) ->
+ memberHashAssignment(partitions, List.of(member), newAssignment));
+
+ // Step 2: Round-robin assignment for unassigned partitions which do
not have members already assigned in the current assignment.
+ Set<TopicIdPartition> assignedPartitions = new
LinkedHashSet<>(newAssignment.keySet());
+ Map<Uuid, List<TopicIdPartition>> unassignedPartitions = new
HashMap<>();
+ targetPartitions.forEach(targetPartition -> {
+ if (!assignedPartitions.contains(targetPartition) &&
!currentAssignment.containsKey(targetPartition))
+
unassignedPartitions.computeIfAbsent(targetPartition.topicId(), k -> new
ArrayList<>()).add(targetPartition);
+ });
+
+ unassignedPartitions.keySet().forEach(unassignedTopic ->
+
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic),
unassignedPartitions.get(unassignedTopic), newAssignment));
+
+ // Step 3: We combine current assignment and new assignment.
+ Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
+ // As per the KIP, we should revoke the assignments from current
assignment for partitions that were assigned by step 1
+ // in the new assignment and have members in current assignment by
step 2. But we haven't implemented it to avoid the
+ // complexity in both the implementation and the run time complexity.
This step was mentioned in the KIP to reduce
+ // the burden of certain members of the share groups. This can be
achieved with the help of limiting the max
+ // no. of partitions assignment for every member(KAFKA-18788). Hence,
the potential problem of burdening
+ // the share consumers will be addressed in a future PR.
+
+ newAssignment.forEach((targetPartition, members) ->
members.forEach(member ->
+ finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition)));
+ // When combining current assignment, we need to only consider the
member topic subscription in current assignment
+ // which is being subscribed in the new assignment as well.
+ currentAssignment.forEach((topicIdPartition, members) ->
members.forEach(member -> {
+ if
(topicToMemberSubscription.getOrDefault(topicIdPartition.topicId(),
Collections.emptySet()).contains(member)
+ && !newAssignment.containsKey(topicIdPartition))
+ finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(topicIdPartition);
+ }));
+
+ return groupAssignment(finalAssignment, groupSpec.memberIds());
+ }
+
+ private GroupAssignment groupAssignment(
+ Map<String, Set<TopicIdPartition>> assignmentByMember,
+ Collection<String> allGroupMembers
+ ) {
+ Map<String, MemberAssignment> members = new HashMap<>();
+ for (Map.Entry<String, Set<TopicIdPartition>> entry :
assignmentByMember.entrySet()) {
+ Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
+ entry.getValue().forEach(targetPartition ->
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new
HashSet<>()).add(targetPartition.partitionId()));
+ members.put(entry.getKey(), new
MemberAssignmentImpl(targetPartitions));
+ }
+ allGroupMembers.forEach(member -> {
+ if (!members.containsKey(member))
+ members.put(member, new MemberAssignmentImpl(new HashMap<>()));
+ });
+
return new GroupAssignment(members);
}
- private Map<Uuid, Set<Integer>> computeTargetPartitions(
- Set<Uuid> subscribeTopicIds,
+ /**
+ * This function updates assignment by hashing the member IDs of the
members and maps the partitions assigned to the
+ * members based on the hash. This gives approximately even balance.
+ * @param unassignedPartitions - the subscribed topic partitions which
needs assignment.
+ * @param memberIds - the member ids to which the topic partitions need to
be assigned.
+ * @param assignment - the existing assignment by topic partition. We need
to pass it as a parameter because this
+ * function would be called multiple times for
heterogeneous assignment.
+ */
+ // Visible for testing
+ void memberHashAssignment(
+ List<TopicIdPartition> unassignedPartitions,
+ Collection<String> memberIds,
+ Map<TopicIdPartition, List<String>> assignment
+ ) {
+ if (!unassignedPartitions.isEmpty())
+ for (String memberId : memberIds) {
+ int topicPartitionIndex = Math.abs(memberId.hashCode() %
unassignedPartitions.size());
+ TopicIdPartition topicPartition =
unassignedPartitions.get(topicPartitionIndex);
+ assignment.computeIfAbsent(topicPartition, k -> new
ArrayList<>()).add(memberId);
+ }
+ }
+
+ /**
+ * This functions assigns topic partitions to members by round-robin
approach and updates the existing assignment.
+ * @param memberIds - the member ids to which the topic partitions need to
be assigned, should be non-empty.
+ * @param unassignedPartitions - the subscribed topic partitions which
needs assignment.
+ * @param assignment - the existing assignment by topic partition.
+ */
+ // Visible for testing
+ void roundRobinAssignment(
+ Collection<String> memberIds,
+ List<TopicIdPartition> unassignedPartitions,
+ Map<TopicIdPartition, List<String>> assignment
+ ) {
+ // We iterate through the target partitions and assign a memberId to
them. In case we run out of members (members < targetPartitions),
+ // we again start from the starting index of memberIds.
+ Iterator<String> memberIdIterator = memberIds.iterator();
+ for (TopicIdPartition targetPartition : unassignedPartitions) {
+ if (!memberIdIterator.hasNext()) {
+ memberIdIterator = memberIds.iterator();
+ }
+ String memberId = memberIdIterator.next();
+ assignment.computeIfAbsent(targetPartition, k -> new
ArrayList<>()).add(memberId);
+ }
+ }
+
+ private List<TopicIdPartition> computeTargetPartitions(
+ Set<Uuid> subscribedTopicIds,
SubscribedTopicDescriber subscribedTopicDescriber
) {
- Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
- subscribeTopicIds.forEach(topicId -> {
+ List<TopicIdPartition> targetPartitions = new ArrayList<>();
+ subscribedTopicIds.forEach(topicId -> {
int numPartitions =
subscribedTopicDescriber.numPartitions(topicId);
if (numPartitions == -1) {
throw new PartitionAssignorException(
@@ -111,11 +313,9 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
);
}
- Set<Integer> partitions = new HashSet<>();
for (int i = 0; i < numPartitions; i++) {
- partitions.add(i);
+ targetPartitions.add(new TopicIdPartition(topicId, i));
}
- targetPartitions.put(topicId, partitions);
});
return targetPartitions;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index 71b64dcb817..a0b86d6fa13 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -19,21 +19,24 @@ package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import
org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
+import org.apache.kafka.server.common.TopicIdPartition;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.TreeMap;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@@ -41,16 +44,21 @@ import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class SimpleAssignorTest {
private static final Uuid TOPIC_1_UUID = Uuid.randomUuid();
private static final Uuid TOPIC_2_UUID = Uuid.randomUuid();
private static final Uuid TOPIC_3_UUID = Uuid.randomUuid();
+ private static final Uuid TOPIC_4_UUID = Uuid.randomUuid();
private static final String TOPIC_1_NAME = "topic1";
+ private static final String TOPIC_2_NAME = "topic2";
private static final String TOPIC_3_NAME = "topic3";
+ private static final String TOPIC_4_NAME = "topic4";
private static final String MEMBER_A = "A";
private static final String MEMBER_B = "B";
+ private static final String MEMBER_C = "C";
private final SimpleAssignor assignor = new SimpleAssignor();
@@ -62,13 +70,13 @@ public class SimpleAssignorTest {
@Test
public void testAssignWithEmptyMembers() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(
- Collections.emptyMap()
+ Map.of()
);
GroupSpec groupSpec = new GroupSpecImpl(
- Collections.emptyMap(),
+ Map.of(),
HOMOGENEOUS,
- Collections.emptyMap()
+ Map.of()
);
GroupAssignment groupAssignment = assignor.assign(
@@ -76,13 +84,24 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
- assertEquals(Collections.emptyMap(), groupAssignment.members());
+ assertEquals(Map.of(), groupAssignment.members());
+
+ groupSpec = new GroupSpecImpl(
+ Map.of(),
+ HETEROGENEOUS,
+ Map.of()
+ );
+ groupAssignment = assignor.assign(
+ groupSpec,
+ subscribedTopicMetadata
+ );
+ assertEquals(Map.of(), groupAssignment.members());
}
@Test
public void testAssignWithNoSubscribedTopic() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(
- Collections.singletonMap(
+ Map.of(
TOPIC_1_UUID,
new TopicMetadata(
TOPIC_1_UUID,
@@ -92,12 +111,12 @@ public class SimpleAssignorTest {
)
);
- Map<String, MemberSubscriptionAndAssignmentImpl> members =
Collections.singletonMap(
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
MEMBER_A,
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
- Collections.emptySet(),
+ Set.of(),
Assignment.EMPTY
)
);
@@ -105,7 +124,7 @@ public class SimpleAssignorTest {
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
- Collections.emptyMap()
+ Map.of()
);
GroupAssignment groupAssignment = assignor.assign(
@@ -113,13 +132,13 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
- assertEquals(Collections.emptyMap(), groupAssignment.members());
+ assertEquals(Map.of(), groupAssignment.members());
}
@Test
public void testAssignWithSubscribedToNonExistentTopic() {
SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(
- Collections.singletonMap(
+ Map.of(
TOPIC_1_UUID,
new TopicMetadata(
TOPIC_1_UUID,
@@ -129,7 +148,7 @@ public class SimpleAssignorTest {
)
);
- Map<String, MemberSubscriptionAndAssignmentImpl> members =
Collections.singletonMap(
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
MEMBER_A,
new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
@@ -142,7 +161,7 @@ public class SimpleAssignorTest {
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
- Collections.emptyMap()
+ Map.of()
);
assertThrows(PartitionAssignorException.class,
@@ -163,26 +182,30 @@ public class SimpleAssignorTest {
2
));
- Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = new
HashMap<>();
+
+ Set<Uuid> topicsSubscription = new LinkedHashSet<>();
+ topicsSubscription.add(TOPIC_1_UUID);
+ topicsSubscription.add(TOPIC_3_UUID);
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
- Set.of(TOPIC_1_UUID, TOPIC_3_UUID),
+ topicsSubscription,
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
- Set.of(TOPIC_1_UUID, TOPIC_3_UUID),
+ topicsSubscription,
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
- Collections.emptyMap()
+ Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(topicMetadata);
@@ -191,16 +214,22 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
+ // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
+ // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
+ // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by
round-robin assignment.
+ // Step 3 -> no new assignment gets added by current assignment since
it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new
HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
- mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+ mkTopicAssignment(TOPIC_1_UUID, 0, 2),
+ mkTopicAssignment(TOPIC_3_UUID, 1)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
- mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+ mkTopicAssignment(TOPIC_1_UUID, 1),
+ mkTopicAssignment(TOPIC_3_UUID, 0)
));
+ // T1: 3 partitions + T3: 2 partitions = 5 partitions
+ assertEveryPartitionGetsAssignment(5, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@@ -224,11 +253,15 @@ public class SimpleAssignorTest {
2
));
- Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
+ Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
+ memberATopicsSubscription.add(TOPIC_1_UUID);
+ memberATopicsSubscription.add(TOPIC_2_UUID);
+
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = new
HashMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
- Set.of(TOPIC_1_UUID, TOPIC_2_UUID),
+ memberATopicsSubscription,
Assignment.EMPTY
));
@@ -239,18 +272,20 @@ public class SimpleAssignorTest {
Assignment.EMPTY
));
- String memberC = "C";
- members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
+ Set<Uuid> memberCTopicsSubscription = new LinkedHashSet<>();
+ memberCTopicsSubscription.add(TOPIC_2_UUID);
+ memberCTopicsSubscription.add(TOPIC_3_UUID);
+ members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
- Set.of(TOPIC_2_UUID, TOPIC_3_UUID),
+ memberCTopicsSubscription,
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
- Collections.emptyMap()
+ Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(topicMetadata);
@@ -259,19 +294,24 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
+ // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of
MEMBER_C is 67.
+ // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by
hash assignment.
+ // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B,
T2:1 -> member_C by round-robin assignment.
+ // Step 3 -> no new assignment gets added by current assignment since
it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new
HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
- mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
+ mkTopicAssignment(TOPIC_2_UUID, 0, 2)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
- expectedAssignment.put(memberC, mkAssignment(
- mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2),
- mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+ expectedAssignment.put(MEMBER_C, mkAssignment(
+ mkTopicAssignment(TOPIC_2_UUID, 1, 2)
));
+ // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8
partitions
+ assertEveryPartitionGetsAssignment(8, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@@ -290,25 +330,28 @@ public class SimpleAssignorTest {
2
));
- Map<String, MemberSubscriptionAndAssignmentImpl> members = new
TreeMap<>();
+ Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
+ memberATopicsSubscription.add(TOPIC_1_UUID);
+ memberATopicsSubscription.add(TOPIC_2_UUID);
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = new
HashMap<>();
members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
- Set.of(TOPIC_1_UUID, TOPIC_2_UUID),
+ memberATopicsSubscription,
Assignment.EMPTY
));
members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
- Collections.emptySet(),
+ Set.of(),
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HETEROGENEOUS,
- Collections.emptyMap()
+ Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(topicMetadata);
@@ -320,12 +363,378 @@ public class SimpleAssignorTest {
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new
HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
- mkTopicAssignment(TOPIC_2_UUID, 0, 1)
- ));
+ mkTopicAssignment(TOPIC_2_UUID, 0, 1)));
+ expectedAssignment.put(MEMBER_B, mkAssignment());
+ // T1: 3 partitions + T2: 2 partitions = 5 partitions
+ assertEveryPartitionGetsAssignment(5, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
+ @Test
+ public void testMemberHashAssignment() {
+ // hashcode for "member1" is 948881623.
+ String member1 = "member1";
+ // hashcode for "member2" is 948881624.
+ String member2 = "member2";
+ // hashcode for "member3" is 948881625.
+ String member3 = "member3";
+ // hashcode for "member4" is 948881626.
+ String member4 = "member4";
+ // hashcode for "AaAaAaAa" is -540425984 to test with negative
hashcode.
+ String member5 = "AaAaAaAa";
+ List<String> members = List.of(member1, member2, member3, member4,
member5);
+
+ TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+ TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+ TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+ List<TopicIdPartition> partitions = List.of(partition1, partition2,
partition3);
+
+ Map<TopicIdPartition, List<String>> computedAssignment = new
HashMap<>();
+ assignor.memberHashAssignment(partitions, members, computedAssignment);
+
+ Map<TopicIdPartition, List<String>> expectedAssignment = new
HashMap<>();
+ expectedAssignment.put(partition1, List.of(member3));
+ expectedAssignment.put(partition2, List.of(member1, member4));
+ expectedAssignment.put(partition3, List.of(member2, member5));
+ assertAssignment(expectedAssignment, computedAssignment);
+ }
+
+ @Test
+ public void testRoundRobinAssignment() {
+ String member1 = "member1";
+ String member2 = "member2";
+ List<String> members = List.of(member1, member2);
+ TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+ TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+ TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+ TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
+ List<TopicIdPartition> unassignedPartitions = List.of(partition2,
partition3, partition4);
+
+ Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
+ assignment.put(partition1, List.of(member1));
+
+ assignor.roundRobinAssignment(members, unassignedPartitions,
assignment);
+ Map<TopicIdPartition, List<String>> expectedAssignment = Map.of(
+ partition1, List.of(member1),
+ partition2, List.of(member1),
+ partition3, List.of(member2),
+ partition4, List.of(member1)
+ );
+
+ assertAssignment(expectedAssignment, assignment);
+ }
+
+ @Test
+ public void testAssignWithCurrentAssignmentHomogeneous() {
+ // Current assignment setup - Two members A, B subscribing to T1 and
T2.
+ Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
+ topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
+ TOPIC_1_UUID,
+ TOPIC_1_NAME,
+ 3
+ ));
+ topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
+ TOPIC_2_UUID,
+ TOPIC_2_NAME,
+ 2
+ ));
+
+ Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new
HashMap<>();
+
+ Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
+ topicsSubscription1.add(TOPIC_1_UUID);
+ topicsSubscription1.add(TOPIC_2_UUID);
+
+ members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription1,
+ Assignment.EMPTY
+ ));
+
+ members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription1,
+ Assignment.EMPTY
+ ));
+
+ GroupSpec groupSpec1 = new GroupSpecImpl(
+ members1,
+ HOMOGENEOUS,
+ Map.of()
+ );
+ SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new
SubscribedTopicDescriberImpl(topicMetadata1);
+
+ GroupAssignment computedAssignment1 = assignor.assign(
+ groupSpec1,
+ subscribedTopicMetadata1
+ );
+
+ // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
+ // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
+ // Step 2 -> T1:2, T2:1 -> MEMBER_A and T2:0 -> MEMBER_B by
round-robin assignment.
+ // Step 3 -> no new assignment gets added by current assignment since
it is empty.
+ Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new
HashMap<>();
+ expectedAssignment1.put(MEMBER_A, mkAssignment(
+ mkTopicAssignment(TOPIC_1_UUID, 0, 2),
+ mkTopicAssignment(TOPIC_2_UUID, 1)
+ ));
+ expectedAssignment1.put(MEMBER_B, mkAssignment(
+ mkTopicAssignment(TOPIC_1_UUID, 1),
+ mkTopicAssignment(TOPIC_2_UUID, 0)
+ ));
+
+ // T1: 3 partitions + T2: 2 partitions = 5 partitions
+ assertEveryPartitionGetsAssignment(5, computedAssignment1);
+ assertAssignment(expectedAssignment1, computedAssignment1);
+
+ // New assignment setup - Three members A, B, C subscribing to T2 and
T3.
+ Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
+ topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
+ TOPIC_2_UUID,
+ TOPIC_2_NAME,
+ 2
+ ));
+ topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
+ TOPIC_3_UUID,
+ TOPIC_3_NAME,
+ 3
+ ));
+
+ Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new
HashMap<>();
+
+ Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
+ topicsSubscription2.add(TOPIC_2_UUID);
+ topicsSubscription2.add(TOPIC_3_UUID);
+
+ members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription2,
+ // Utilizing the assignment from current assignment
+ new Assignment(mkAssignment(
+ mkTopicAssignment(TOPIC_1_UUID, 0, 2),
+ mkTopicAssignment(TOPIC_2_UUID, 1)))
+ ));
+
+ members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription2,
+ new Assignment(mkAssignment(
+ mkTopicAssignment(TOPIC_1_UUID, 1),
+ mkTopicAssignment(TOPIC_2_UUID, 0)))
+ ));
+
+ members2.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription2,
+ Assignment.EMPTY
+ ));
+
+ GroupSpec groupSpec2 = new GroupSpecImpl(
+ members2,
+ HOMOGENEOUS,
+ Map.of()
+ );
+ SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new
SubscribedTopicDescriberImpl(topicMetadata2);
+
+ GroupAssignment computedAssignment2 = assignor.assign(
+ groupSpec2,
+ subscribedTopicMetadata2
+ );
+
+ // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of
MEMBER_C is 67.
+ // Step 1 -> T2:0 -> MEMBER_A, T2:1 -> MEMBER_B, T3:0 -> MEMBER_C by
hash assignment
+ // Step 2 -> T3:1 -> MEMBER_A, T3:2 -> MEMBER_B by round-robin
assignment
+ // Step 3 -> no new addition by current assignment since T2:0 and T2:1
were already a part of new assignment.
+ Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new
HashMap<>();
+ expectedAssignment2.put(MEMBER_A, mkAssignment(
+ mkTopicAssignment(TOPIC_2_UUID, 0),
+ mkTopicAssignment(TOPIC_3_UUID, 1)
+ ));
+ expectedAssignment2.put(MEMBER_B, mkAssignment(
+ mkTopicAssignment(TOPIC_2_UUID, 1),
+ mkTopicAssignment(TOPIC_3_UUID, 2)
+ ));
+ expectedAssignment2.put(MEMBER_C, mkAssignment(
+ mkTopicAssignment(TOPIC_3_UUID, 0)
+ ));
+
+ // T2: 2 partitions + T3: 3 partitions = 5 partitions
+ assertEveryPartitionGetsAssignment(5, computedAssignment2);
+ assertAssignment(expectedAssignment2, computedAssignment2);
+ }
+
+ @Test
+ public void testAssignWithCurrentAssignmentHeterogeneous() {
+ // Current assignment setup - 3 members A - {T1, T2}, B - {T3}, C -
{T2, T3}.
+ Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
+ topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
+ TOPIC_1_UUID,
+ TOPIC_1_NAME,
+ 3
+ ));
+
+ topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
+ TOPIC_2_UUID,
+ TOPIC_2_NAME,
+ 3
+ ));
+ topicMetadata1.put(TOPIC_3_UUID, new TopicMetadata(
+ TOPIC_3_UUID,
+ TOPIC_3_NAME,
+ 2
+ ));
+
+ Set<Uuid> memberATopicsSubscription1 = new LinkedHashSet<>();
+ memberATopicsSubscription1.add(TOPIC_1_UUID);
+ memberATopicsSubscription1.add(TOPIC_2_UUID);
+
+ Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new
HashMap<>();
+ members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ memberATopicsSubscription1,
+ Assignment.EMPTY
+ ));
+
+ members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ Set.of(TOPIC_3_UUID),
+ Assignment.EMPTY
+ ));
+
+ Set<Uuid> memberCTopicsSubscription1 = new LinkedHashSet<>();
+ memberCTopicsSubscription1.add(TOPIC_2_UUID);
+ memberCTopicsSubscription1.add(TOPIC_3_UUID);
+ members1.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ memberCTopicsSubscription1,
+ Assignment.EMPTY
+ ));
+
+ GroupSpec groupSpec1 = new GroupSpecImpl(
+ members1,
+ HETEROGENEOUS,
+ Map.of()
+ );
+ SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new
SubscribedTopicDescriberImpl(topicMetadata1);
+
+ GroupAssignment computedAssignment1 = assignor.assign(
+ groupSpec1,
+ subscribedTopicMetadata1
+ );
+
+ // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of
MEMBER_C is 67.
+ // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by
hash assignment.
+ // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B,
T2:1 -> member_C by round-robin assignment.
+ // Step 3 -> no new assignment gets added by current assignment since
it is empty.
+ Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new
HashMap<>();
+ expectedAssignment1.put(MEMBER_A, mkAssignment(
+ mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
+ mkTopicAssignment(TOPIC_2_UUID, 0, 2)
+ ));
+ expectedAssignment1.put(MEMBER_B, mkAssignment(
+ mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+ ));
+ expectedAssignment1.put(MEMBER_C, mkAssignment(
+ mkTopicAssignment(TOPIC_2_UUID, 1, 2)
+ ));
+
+ // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8
partitions
+ assertEveryPartitionGetsAssignment(8, computedAssignment1);
+ assertAssignment(expectedAssignment1, computedAssignment1);
+
+ // New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}.
+
+ Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
+ topicMetadata2.put(TOPIC_1_UUID, new TopicMetadata(
+ TOPIC_1_UUID,
+ TOPIC_1_NAME,
+ 3
+ ));
+ topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
+ TOPIC_2_UUID,
+ TOPIC_2_NAME,
+ 3
+ ));
+ topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
+ TOPIC_3_UUID,
+ TOPIC_3_NAME,
+ 2
+ ));
+ topicMetadata2.put(TOPIC_4_UUID, new TopicMetadata(
+ TOPIC_4_UUID,
+ TOPIC_4_NAME,
+ 1
+ ));
+
+ Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new
HashMap<>();
+
+ Set<Uuid> memberATopicsSubscription2 = new LinkedHashSet<>();
+ memberATopicsSubscription2.add(TOPIC_1_UUID);
+ memberATopicsSubscription2.add(TOPIC_2_UUID);
+ memberATopicsSubscription2.add(TOPIC_3_UUID);
+
+ Set<Uuid> memberBTopicsSubscription2 = new LinkedHashSet<>();
+ memberBTopicsSubscription2.add(TOPIC_3_UUID);
+ memberBTopicsSubscription2.add(TOPIC_4_UUID);
+
+ members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ memberATopicsSubscription2,
+ new Assignment(mkAssignment(
+ mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
+ mkTopicAssignment(TOPIC_2_UUID, 0, 2)))
+ ));
+
+ members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ memberBTopicsSubscription2,
+ new Assignment(mkAssignment(
+ mkTopicAssignment(TOPIC_3_UUID, 0, 1)))
+ ));
+
+ GroupSpec groupSpec2 = new GroupSpecImpl(
+ members2,
+ HETEROGENEOUS,
+ Map.of()
+ );
+
+ SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new
SubscribedTopicDescriberImpl(topicMetadata2);
+
+ GroupAssignment computedAssignment2 = assignor.assign(
+ groupSpec2,
+ subscribedTopicMetadata2
+ );
+
+ // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
+ // Step 1 -> T1:1 -> member_A, T3:0 -> member_B by hash assignment.
+ // Step 2 -> T2:1 -> member_A, T4:0 -> member_B by round-robin
assignment.
+ // Step 3 -> T1:0, T1:2, T2:0 -> member_A, T3:1 -> member_B by
current assignment.
+ Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new
HashMap<>();
+ expectedAssignment2.put(MEMBER_A, mkAssignment(
+ mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
+ mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
+ ));
+ expectedAssignment2.put(MEMBER_B, mkAssignment(
+ mkTopicAssignment(TOPIC_3_UUID, 0, 1),
+ mkTopicAssignment(TOPIC_4_UUID, 0)
+ ));
+
+ // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions + T4: 1
partition = 9 partitions
+ assertEveryPartitionGetsAssignment(9, computedAssignment2);
+ assertAssignment(expectedAssignment2, computedAssignment2);
+ }
+
private void assertAssignment(
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment,
GroupAssignment computedGroupAssignment
@@ -336,4 +745,31 @@ public class SimpleAssignorTest {
assertEquals(expectedAssignment.get(memberId),
computedAssignmentForMember);
}
}
+
+ private void assertAssignment(
+ Map<TopicIdPartition, List<String>> expectedAssignment,
+ Map<TopicIdPartition, List<String>> computedAssignment
+ ) {
+ assertEquals(expectedAssignment.size(), computedAssignment.size());
+ expectedAssignment.forEach((topicIdPartition, members) -> {
+ List<String> computedMembers =
computedAssignment.getOrDefault(topicIdPartition, List.of());
+ assertEquals(members.size(), computedMembers.size());
+ members.forEach(member ->
assertTrue(computedMembers.contains(member)));
+ });
+ }
+
+ private void assertEveryPartitionGetsAssignment(
+ int expectedPartitions,
+ GroupAssignment computedGroupAssignment
+ ) {
+ Map<String, MemberAssignment> memberAssignments =
computedGroupAssignment.members();
+ Set<TopicIdPartition> topicPartitionAssignments = new HashSet<>();
+ memberAssignments.values().forEach(memberAssignment -> {
+ Map<Uuid, Set<Integer>> topicIdPartitions =
memberAssignment.partitions();
+ topicIdPartitions.forEach((topicId, partitions) ->
+ partitions.forEach(partition ->
topicPartitionAssignments.add(new TopicIdPartition(topicId, partition)))
+ );
+ });
+ assertEquals(expectedPartitions, topicPartitionAssignments.size());
+ }
}