This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4b5a16bf6ff KAFKA-18757: Create full-function SimpleAssignor to match 
KIP-932 description (#18864)
4b5a16bf6ff is described below

commit 4b5a16bf6ffcb4322d7c5b35a484ff914397909c
Author: Abhinav Dixit <[email protected]>
AuthorDate: Wed Feb 26 16:32:23 2025 +0530

    KAFKA-18757: Create full-function SimpleAssignor to match KIP-932 
description (#18864)
    
    ### About
    The current `SimpleAssignor` in AK assigned all subscribed topic
    partitions to all the share group members. This does not match the
    description given in
    
[KIP-932](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-TheSimpleAssignor).
    Here are the rules as mentioned in the KIP by which the assignment
    should happen. We have changed the step 3 implementation here due to the
    reasons
    
[described](https://github.com/apache/kafka/pull/18864#issuecomment-2659266502)
    -
    
    1. The assignor hashes the member IDs of the members and maps the
    partitions assigned to the members based on the hash. This gives
    approximately even balance.
    2. If any partitions were not assigned any members by (1) and do not
    have members already assigned in the current assignment, members are
    assigned round-robin until each partition has at least one member
    assigned to it.
    3. We combine the current and new assignment. (Original rule - If any
    partitions were assigned members by (1) and also have members in the
    current assignment assigned by (2), the members assigned by (2) are
    removed.)
    
    ### Tests
    The added code has been verified with unit tests and the already present
    integration tests.
    
    Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal 
<[email protected]>, TaiJuWu <[email protected]>
---
 .../server/ShareGroupHeartbeatRequestTest.scala    |  27 +-
 .../coordinator/group/assignor/SimpleAssignor.java | 242 +++++++++-
 .../group/assignor/SimpleAssignorTest.java         | 512 +++++++++++++++++++--
 3 files changed, 711 insertions(+), 70 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 07c7b959ab8..448b6897ede 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.test.ClusterInstance
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertNotNull, assertNull, assertTrue}
 import org.junit.jupiter.api.{Tag, Timeout}
 
+import java.util
 import scala.jdk.CollectionConverters._
 
 @Timeout(120)
@@ -216,18 +217,12 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       assertNotEquals(memberId1, memberId2)
 
       // Create the topic.
-      val topicId = TestUtils.createTopicWithAdminRaw(
+      TestUtils.createTopicWithAdminRaw(
         admin = admin,
         topic = "foo",
         numPartitions = 3
       )
 
-      // This is the expected assignment.
-      val expectedAssignment = new ShareGroupHeartbeatResponseData.Assignment()
-        .setTopicPartitions(List(new 
ShareGroupHeartbeatResponseData.TopicPartitions()
-          .setTopicId(topicId)
-          .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
-
       // Prepare the next heartbeat for member 1.
       shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
         new ShareGroupHeartbeatRequestData()
@@ -241,10 +236,10 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       shareGroupHeartbeatResponse = null
       TestUtils.waitUntilTrue(() => {
         shareGroupHeartbeatResponse = 
connectAndReceive(shareGroupHeartbeatRequest)
-        shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
-          shareGroupHeartbeatResponse.data.assignment == expectedAssignment
+        shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && 
shareGroupHeartbeatResponse.data.assignment != null
       }, msg = s"Could not get partitions assigned. Last response 
$shareGroupHeartbeatResponse.")
 
+      val topicPartitionsAssignedToMember1 = 
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
       // Verify the response.
       assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
 
@@ -261,13 +256,23 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       shareGroupHeartbeatResponse = null
       TestUtils.waitUntilTrue(() => {
         shareGroupHeartbeatResponse = 
connectAndReceive(shareGroupHeartbeatRequest)
-        shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
-          shareGroupHeartbeatResponse.data.assignment == expectedAssignment
+        shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && 
shareGroupHeartbeatResponse.data.assignment !=  null
       }, msg = s"Could not get partitions assigned. Last response 
$shareGroupHeartbeatResponse.")
 
+      val topicPartitionsAssignedToMember2 = 
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
       // Verify the response.
       assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
 
+      val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
+      topicPartitionsAssignedToMember1.forEach(topicPartition => {
+        partitionsAssigned.addAll(topicPartition.partitions())
+      })
+      topicPartitionsAssignedToMember2.forEach(topicPartition => {
+        partitionsAssigned.addAll(topicPartition.partitions())
+      })
+      // Verify all the 3 topic partitions for "foo" have been assigned to at 
least 1 member.
+      assertEquals(util.Set.of(0, 1, 2), partitionsAssigned)
+
       // Verify the assignments are not changed for member 1.
       // Prepare another heartbeat for member 1 with latest received epoch 3 
for member 1.
       shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
index 4cc66468ae7..781b64dd604 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
@@ -25,19 +25,23 @@ import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorExceptio
 import 
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
 import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
 import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+import org.apache.kafka.server.common.TopicIdPartition;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 
 /**
- * A simple partition assignor that assigns each member all partitions of the 
subscribed topics.
+ * A simple partition assignor that assigns partitions of the subscribed 
topics based on the rules defined in KIP-932 to different members.
  */
 public class SimpleAssignor implements ShareGroupPartitionAssignor {
 
@@ -54,7 +58,7 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
         SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
         if (groupSpec.memberIds().isEmpty())
-            return new GroupAssignment(Collections.emptyMap());
+            return new GroupAssignment(Map.of());
 
         if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
             return assignHomogenous(groupSpec, subscribedTopicDescriber);
@@ -67,42 +71,240 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
         GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) {
-        Set<Uuid> subscribeTopicIds = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
+        Set<Uuid> subscribedTopicIds = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
             .subscribedTopicIds();
-        if (subscribeTopicIds.isEmpty())
-            return new GroupAssignment(Collections.emptyMap());
+        if (subscribedTopicIds.isEmpty())
+            return new GroupAssignment(Map.of());
 
-        Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions(
-            subscribeTopicIds, subscribedTopicDescriber);
+        // Subscribed topic partitions for the share group.
+        List<TopicIdPartition> targetPartitions = computeTargetPartitions(
+            subscribedTopicIds, subscribedTopicDescriber);
 
-        return new 
GroupAssignment(groupSpec.memberIds().stream().collect(Collectors.toMap(
-            Function.identity(), memberId -> new 
MemberAssignmentImpl(targetPartitions))));
+        // The current assignment from topic partition to members.
+        Map<TopicIdPartition, List<String>> currentAssignment = 
currentAssignment(groupSpec);
+        return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, 
targetPartitions, currentAssignment);
     }
 
     private GroupAssignment assignHeterogeneous(
         GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) {
-        Map<String, MemberAssignment> members = new HashMap<>();
+        Map<String, List<TopicIdPartition>> memberToPartitionsSubscription = 
new HashMap<>();
         for (String memberId : groupSpec.memberIds()) {
             MemberSubscription spec = groupSpec.memberSubscription(memberId);
             if (spec.subscribedTopicIds().isEmpty())
                 continue;
 
-            Map<Uuid, Set<Integer>> targetPartitions = computeTargetPartitions(
+            // Subscribed topic partitions for the share group member.
+            List<TopicIdPartition> targetPartitions = computeTargetPartitions(
                 spec.subscribedTopicIds(), subscribedTopicDescriber);
+            memberToPartitionsSubscription.put(memberId, targetPartitions);
+        }
+
+        // The current assignment from topic partition to members.
+        Map<TopicIdPartition, List<String>> currentAssignment = 
currentAssignment(groupSpec);
+        return newAssignmentHeterogeneous(groupSpec, 
memberToPartitionsSubscription, currentAssignment);
+    }
+
+    /**
+     * Get the current assignment by topic partitions.
+     * @param groupSpec - The group metadata specifications.
+     * @return the current assignment for subscribed topic partitions to 
memberIds.
+     */
+    private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec 
groupSpec) {
+        Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
 
-            members.put(memberId, new MemberAssignmentImpl(targetPartitions));
+        for (String member : groupSpec.memberIds()) {
+            Map<Uuid, Set<Integer>> assignedTopicPartitions = 
groupSpec.memberAssignment(member).partitions();
+            assignedTopicPartitions.forEach((topicId, partitions) -> 
partitions.forEach(
+                partition -> assignment.computeIfAbsent(new 
TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member)));
         }
+        return assignment;
+    }
+
+    /**
+     * This function computes the new assignment for a homogeneous group.
+     * @param groupSpec - The group metadata specifications.
+     * @param subscribedTopicIds - The set of all the subscribed topic ids for 
the group.
+     * @param targetPartitions - The list of all topic partitions that need 
assignment.
+     * @param currentAssignment - The current assignment for subscribed topic 
partitions to memberIds.
+     * @return the new partition assignment for the members of the group.
+     */
+    private GroupAssignment newAssignmentHomogeneous(
+        GroupSpec groupSpec,
+        Set<Uuid> subscribedTopicIds,
+        List<TopicIdPartition> targetPartitions,
+        Map<TopicIdPartition, List<String>> currentAssignment
+    ) {
+        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
+
+        // Step 1: Hash member IDs to topic partitions.
+        memberHashAssignment(targetPartitions, groupSpec.memberIds(), 
newAssignment);
+
+        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
+        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
+            .filter(targetPartition -> 
!newAssignment.containsKey(targetPartition))
+            .filter(targetPartition -> 
!currentAssignment.containsKey(targetPartition))
+            .toList();
+
+        roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, 
newAssignment);
+
+        // Step 3: We combine current assignment and new assignment.
+        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
+
+        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
+        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
+        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
+        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
+        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
+        // the share consumers will be addressed in a future PR.
+
+        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
+            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
+        // When combining current assignment, we need to only consider the 
topics in current assignment that are also being
+        // subscribed in the new assignment as well.
+        currentAssignment.forEach((targetPartition, members) -> {
+            if (subscribedTopicIds.contains(targetPartition.topicId()))
+                members.forEach(member -> {
+                    if (groupSpec.memberIds().contains(member) && 
!newAssignment.containsKey(targetPartition))
+                        finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+                });
+        });
+
+        return groupAssignment(finalAssignment, groupSpec.memberIds());
+    }
+
+    /**
+     * This function computes the new assignment for a heterogeneous group.
+     * @param groupSpec - The group metadata specifications.
+     * @param memberToPartitionsSubscription - The member to subscribed topic 
partitions map.
+     * @param currentAssignment - The current assignment for subscribed topic 
partitions to memberIds.
+     * @return the new partition assignment for the members of the group.
+     */
+    private GroupAssignment newAssignmentHeterogeneous(
+        GroupSpec groupSpec,
+        Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
+        Map<TopicIdPartition, List<String>> currentAssignment
+    ) {
+
+        // Exhaustive set of all subscribed topic partitions.
+        Set<TopicIdPartition> targetPartitions = new LinkedHashSet<>();
+        
memberToPartitionsSubscription.values().forEach(targetPartitions::addAll);
+
+        // Create a map for topic to members subscription.
+        Map<Uuid, Set<String>> topicToMemberSubscription = new HashMap<>();
+        memberToPartitionsSubscription.forEach((member, partitions) ->
+            partitions.forEach(partition -> 
topicToMemberSubscription.computeIfAbsent(partition.topicId(), k -> new 
LinkedHashSet<>()).add(member)));
+
+        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
+
+        // Step 1: Hash member IDs to partitions.
+        memberToPartitionsSubscription.forEach((member, partitions) ->
+            memberHashAssignment(partitions, List.of(member), newAssignment));
+
+        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
+        Set<TopicIdPartition> assignedPartitions = new 
LinkedHashSet<>(newAssignment.keySet());
+        Map<Uuid, List<TopicIdPartition>> unassignedPartitions = new 
HashMap<>();
+        targetPartitions.forEach(targetPartition -> {
+            if (!assignedPartitions.contains(targetPartition) && 
!currentAssignment.containsKey(targetPartition))
+                
unassignedPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
ArrayList<>()).add(targetPartition);
+        });
+
+        unassignedPartitions.keySet().forEach(unassignedTopic ->
+            
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), 
unassignedPartitions.get(unassignedTopic), newAssignment));
+
+        // Step 3: We combine current assignment and new assignment.
+        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
+        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
+        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
+        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
+        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
+        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
+        // the share consumers will be addressed in a future PR.
+
+        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
+            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
+        // When combining current assignment, we need to only consider the 
member topic subscription in current assignment
+        // which is being subscribed in the new assignment as well.
+        currentAssignment.forEach((topicIdPartition, members) -> 
members.forEach(member -> {
+            if 
(topicToMemberSubscription.getOrDefault(topicIdPartition.topicId(), 
Collections.emptySet()).contains(member)
+                && !newAssignment.containsKey(topicIdPartition))
+                finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(topicIdPartition);
+        }));
+
+        return groupAssignment(finalAssignment, groupSpec.memberIds());
+    }
+
+    private GroupAssignment groupAssignment(
+        Map<String, Set<TopicIdPartition>> assignmentByMember,
+        Collection<String> allGroupMembers
+    ) {
+        Map<String, MemberAssignment> members = new HashMap<>();
+        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
+            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
+            entry.getValue().forEach(targetPartition -> 
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
HashSet<>()).add(targetPartition.partitionId()));
+            members.put(entry.getKey(), new 
MemberAssignmentImpl(targetPartitions));
+        }
+        allGroupMembers.forEach(member -> {
+            if (!members.containsKey(member))
+                members.put(member, new MemberAssignmentImpl(new HashMap<>()));
+        });
+
         return new GroupAssignment(members);
     }
 
-    private Map<Uuid, Set<Integer>> computeTargetPartitions(
-        Set<Uuid> subscribeTopicIds,
+    /**
+     * This function updates assignment by hashing the member IDs of the 
members and maps the partitions assigned to the
+     * members based on the hash. This gives approximately even balance.
+     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
+     * @param memberIds - the member ids to which the topic partitions need to 
be assigned.
+     * @param assignment - the existing assignment by topic partition. We need 
to pass it as a parameter because this
+     *                   function would be called multiple times for 
heterogeneous assignment.
+     */
+    // Visible for testing
+    void memberHashAssignment(
+        List<TopicIdPartition> unassignedPartitions,
+        Collection<String> memberIds,
+        Map<TopicIdPartition, List<String>> assignment
+    ) {
+        if (!unassignedPartitions.isEmpty())
+            for (String memberId : memberIds) {
+                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
unassignedPartitions.size());
+                TopicIdPartition topicPartition = 
unassignedPartitions.get(topicPartitionIndex);
+                assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
+            }
+    }
+
+    /**
+     * This functions assigns topic partitions to members by round-robin 
approach and updates the existing assignment.
+     * @param memberIds - the member ids to which the topic partitions need to 
be assigned, should be non-empty.
+     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
+     * @param assignment - the existing assignment by topic partition.
+     */
+    // Visible for testing
+    void roundRobinAssignment(
+        Collection<String> memberIds,
+        List<TopicIdPartition> unassignedPartitions,
+        Map<TopicIdPartition, List<String>> assignment
+    ) {
+        // We iterate through the target partitions and assign a memberId to 
them. In case we run out of members (members < targetPartitions),
+        // we again start from the starting index of memberIds.
+        Iterator<String> memberIdIterator = memberIds.iterator();
+        for (TopicIdPartition targetPartition : unassignedPartitions) {
+            if (!memberIdIterator.hasNext()) {
+                memberIdIterator = memberIds.iterator();
+            }
+            String memberId = memberIdIterator.next();
+            assignment.computeIfAbsent(targetPartition, k -> new 
ArrayList<>()).add(memberId);
+        }
+    }
+
+    private List<TopicIdPartition> computeTargetPartitions(
+        Set<Uuid> subscribedTopicIds,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) {
-        Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
-        subscribeTopicIds.forEach(topicId -> {
+        List<TopicIdPartition> targetPartitions = new ArrayList<>();
+        subscribedTopicIds.forEach(topicId -> {
             int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
             if (numPartitions == -1) {
                 throw new PartitionAssignorException(
@@ -111,11 +313,9 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
                 );
             }
 
-            Set<Integer> partitions = new HashSet<>();
             for (int i = 0; i < numPartitions; i++) {
-                partitions.add(i);
+                targetPartitions.add(new TopicIdPartition(topicId, i));
             }
-            targetPartitions.put(topicId, partitions);
         });
         return targetPartitions;
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index 71b64dcb817..a0b86d6fa13 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -19,21 +19,24 @@ package org.apache.kafka.coordinator.group.assignor;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
 import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
 import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
 import 
org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
 import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
+import org.apache.kafka.server.common.TopicIdPartition;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.TreeMap;
 
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@@ -41,16 +44,21 @@ import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class SimpleAssignorTest {
 
     private static final Uuid TOPIC_1_UUID = Uuid.randomUuid();
     private static final Uuid TOPIC_2_UUID = Uuid.randomUuid();
     private static final Uuid TOPIC_3_UUID = Uuid.randomUuid();
+    private static final Uuid TOPIC_4_UUID = Uuid.randomUuid();
     private static final String TOPIC_1_NAME = "topic1";
+    private static final String TOPIC_2_NAME = "topic2";
     private static final String TOPIC_3_NAME = "topic3";
+    private static final String TOPIC_4_NAME = "topic4";
     private static final String MEMBER_A = "A";
     private static final String MEMBER_B = "B";
+    private static final String MEMBER_C = "C";
 
     private final SimpleAssignor assignor = new SimpleAssignor();
 
@@ -62,13 +70,13 @@ public class SimpleAssignorTest {
     @Test
     public void testAssignWithEmptyMembers() {
         SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(
-            Collections.emptyMap()
+            Map.of()
         );
 
         GroupSpec groupSpec = new GroupSpecImpl(
-            Collections.emptyMap(),
+            Map.of(),
             HOMOGENEOUS,
-            Collections.emptyMap()
+            Map.of()
         );
 
         GroupAssignment groupAssignment = assignor.assign(
@@ -76,13 +84,24 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata
         );
 
-        assertEquals(Collections.emptyMap(), groupAssignment.members());
+        assertEquals(Map.of(), groupAssignment.members());
+
+        groupSpec = new GroupSpecImpl(
+            Map.of(),
+            HETEROGENEOUS,
+            Map.of()
+        );
+        groupAssignment = assignor.assign(
+            groupSpec,
+            subscribedTopicMetadata
+        );
+        assertEquals(Map.of(), groupAssignment.members());
     }
 
     @Test
     public void testAssignWithNoSubscribedTopic() {
         SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(
-            Collections.singletonMap(
+            Map.of(
                 TOPIC_1_UUID,
                 new TopicMetadata(
                     TOPIC_1_UUID,
@@ -92,12 +111,12 @@ public class SimpleAssignorTest {
             )
         );
 
-        Map<String, MemberSubscriptionAndAssignmentImpl> members = 
Collections.singletonMap(
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
             MEMBER_A,
             new MemberSubscriptionAndAssignmentImpl(
                 Optional.empty(),
                 Optional.empty(),
-                Collections.emptySet(),
+                Set.of(),
                 Assignment.EMPTY
             )
         );
@@ -105,7 +124,7 @@ public class SimpleAssignorTest {
         GroupSpec groupSpec = new GroupSpecImpl(
             members,
             HOMOGENEOUS,
-            Collections.emptyMap()
+            Map.of()
         );
 
         GroupAssignment groupAssignment = assignor.assign(
@@ -113,13 +132,13 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata
         );
 
-        assertEquals(Collections.emptyMap(), groupAssignment.members());
+        assertEquals(Map.of(), groupAssignment.members());
     }
 
     @Test
     public void testAssignWithSubscribedToNonExistentTopic() {
         SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(
-            Collections.singletonMap(
+            Map.of(
                 TOPIC_1_UUID,
                 new TopicMetadata(
                     TOPIC_1_UUID,
@@ -129,7 +148,7 @@ public class SimpleAssignorTest {
             )
         );
 
-        Map<String, MemberSubscriptionAndAssignmentImpl> members = 
Collections.singletonMap(
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = Map.of(
             MEMBER_A,
             new MemberSubscriptionAndAssignmentImpl(
                 Optional.empty(),
@@ -142,7 +161,7 @@ public class SimpleAssignorTest {
         GroupSpec groupSpec = new GroupSpecImpl(
             members,
             HOMOGENEOUS,
-            Collections.emptyMap()
+            Map.of()
         );
 
         assertThrows(PartitionAssignorException.class,
@@ -163,26 +182,30 @@ public class SimpleAssignorTest {
             2
         ));
 
-        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
HashMap<>();
+
+        Set<Uuid> topicsSubscription = new LinkedHashSet<>();
+        topicsSubscription.add(TOPIC_1_UUID);
+        topicsSubscription.add(TOPIC_3_UUID);
 
         members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
             Optional.empty(),
             Optional.empty(),
-            Set.of(TOPIC_1_UUID, TOPIC_3_UUID),
+            topicsSubscription,
             Assignment.EMPTY
         ));
 
         members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
             Optional.empty(),
             Optional.empty(),
-            Set.of(TOPIC_1_UUID, TOPIC_3_UUID),
+            topicsSubscription,
             Assignment.EMPTY
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
             members,
             HOMOGENEOUS,
-            Collections.emptyMap()
+            Map.of()
         );
         SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(topicMetadata);
 
@@ -191,16 +214,22 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata
         );
 
+        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
+        // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
+        // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by 
round-robin assignment.
+        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
         expectedAssignment.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
-            mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+            mkTopicAssignment(TOPIC_1_UUID, 0, 2),
+            mkTopicAssignment(TOPIC_3_UUID, 1)
         ));
         expectedAssignment.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
-            mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+            mkTopicAssignment(TOPIC_1_UUID, 1),
+            mkTopicAssignment(TOPIC_3_UUID, 0)
         ));
 
+        // T1: 3 partitions + T3: 2 partitions = 5 partitions
+        assertEveryPartitionGetsAssignment(5, computedAssignment);
         assertAssignment(expectedAssignment, computedAssignment);
     }
 
@@ -224,11 +253,15 @@ public class SimpleAssignorTest {
             2
         ));
 
-        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
+        Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
+        memberATopicsSubscription.add(TOPIC_1_UUID);
+        memberATopicsSubscription.add(TOPIC_2_UUID);
+
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
HashMap<>();
         members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
             Optional.empty(),
             Optional.empty(),
-            Set.of(TOPIC_1_UUID, TOPIC_2_UUID),
+            memberATopicsSubscription,
             Assignment.EMPTY
         ));
 
@@ -239,18 +272,20 @@ public class SimpleAssignorTest {
             Assignment.EMPTY
         ));
 
-        String memberC = "C";
-        members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
+        Set<Uuid> memberCTopicsSubscription = new LinkedHashSet<>();
+        memberCTopicsSubscription.add(TOPIC_2_UUID);
+        memberCTopicsSubscription.add(TOPIC_3_UUID);
+        members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
             Optional.empty(),
             Optional.empty(),
-            Set.of(TOPIC_2_UUID, TOPIC_3_UUID),
+            memberCTopicsSubscription,
             Assignment.EMPTY
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
             members,
             HETEROGENEOUS,
-            Collections.emptyMap()
+            Map.of()
         );
         SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(topicMetadata);
 
@@ -259,19 +294,24 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata
         );
 
+        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of 
MEMBER_C is 67.
+        // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by 
hash assignment.
+        // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, 
T2:1 -> member_C by round-robin assignment.
+        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
         expectedAssignment.put(MEMBER_A, mkAssignment(
             mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
-            mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
+            mkTopicAssignment(TOPIC_2_UUID, 0, 2)
         ));
         expectedAssignment.put(MEMBER_B, mkAssignment(
             mkTopicAssignment(TOPIC_3_UUID, 0, 1)
         ));
-        expectedAssignment.put(memberC, mkAssignment(
-            mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2),
-            mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+        expectedAssignment.put(MEMBER_C, mkAssignment(
+            mkTopicAssignment(TOPIC_2_UUID, 1, 2)
         ));
 
+        // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 
partitions
+        assertEveryPartitionGetsAssignment(8, computedAssignment);
         assertAssignment(expectedAssignment, computedAssignment);
     }
 
@@ -290,25 +330,28 @@ public class SimpleAssignorTest {
             2
         ));
 
-        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
TreeMap<>();
+        Set<Uuid> memberATopicsSubscription = new LinkedHashSet<>();
+        memberATopicsSubscription.add(TOPIC_1_UUID);
+        memberATopicsSubscription.add(TOPIC_2_UUID);
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
HashMap<>();
         members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
             Optional.empty(),
             Optional.empty(),
-            Set.of(TOPIC_1_UUID, TOPIC_2_UUID),
+            memberATopicsSubscription,
             Assignment.EMPTY
         ));
 
         members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
             Optional.empty(),
             Optional.empty(),
-            Collections.emptySet(),
+            Set.of(),
             Assignment.EMPTY
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
             members,
             HETEROGENEOUS,
-            Collections.emptyMap()
+            Map.of()
         );
         SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(topicMetadata);
 
@@ -320,12 +363,378 @@ public class SimpleAssignorTest {
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
         expectedAssignment.put(MEMBER_A, mkAssignment(
             mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
-            mkTopicAssignment(TOPIC_2_UUID, 0, 1)
-        ));
+            mkTopicAssignment(TOPIC_2_UUID, 0, 1)));
+        expectedAssignment.put(MEMBER_B, mkAssignment());
 
+        // T1: 3 partitions + T2: 2 partitions = 5 partitions
+        assertEveryPartitionGetsAssignment(5, computedAssignment);
         assertAssignment(expectedAssignment, computedAssignment);
     }
 
+    @Test
+    public void testMemberHashAssignment() {
+        // hashcode for "member1" is 948881623.
+        String member1 = "member1";
+        // hashcode for "member2" is 948881624.
+        String member2 = "member2";
+        // hashcode for "member3" is 948881625.
+        String member3 = "member3";
+        // hashcode for "member4" is 948881626.
+        String member4 = "member4";
+        // hashcode for "AaAaAaAa" is -540425984 to test with negative 
hashcode.
+        String member5 = "AaAaAaAa";
+        List<String> members = List.of(member1, member2, member3, member4, 
member5);
+
+        TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+        TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+        TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+        List<TopicIdPartition> partitions = List.of(partition1, partition2, 
partition3);
+
+        Map<TopicIdPartition, List<String>> computedAssignment = new 
HashMap<>();
+        assignor.memberHashAssignment(partitions, members, computedAssignment);
+
+        Map<TopicIdPartition, List<String>> expectedAssignment = new 
HashMap<>();
+        expectedAssignment.put(partition1, List.of(member3));
+        expectedAssignment.put(partition2, List.of(member1, member4));
+        expectedAssignment.put(partition3, List.of(member2, member5));
+        assertAssignment(expectedAssignment, computedAssignment);
+    }
+
+    @Test
+    public void testRoundRobinAssignment() {
+        String member1 = "member1";
+        String member2 = "member2";
+        List<String> members = List.of(member1, member2);
+        TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+        TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+        TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+        TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
+        List<TopicIdPartition> unassignedPartitions = List.of(partition2, 
partition3, partition4);
+
+        Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
+        assignment.put(partition1, List.of(member1));
+
+        assignor.roundRobinAssignment(members, unassignedPartitions, 
assignment);
+        Map<TopicIdPartition, List<String>> expectedAssignment = Map.of(
+            partition1, List.of(member1),
+            partition2, List.of(member1),
+            partition3, List.of(member2),
+            partition4, List.of(member1)
+        );
+
+        assertAssignment(expectedAssignment, assignment);
+    }
+
+    @Test
+    public void testAssignWithCurrentAssignmentHomogeneous() {
+        // Current assignment setup - Two members A, B subscribing to T1 and 
T2.
+        Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
+        topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
+            TOPIC_1_UUID,
+            TOPIC_1_NAME,
+            3
+        ));
+        topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
+            TOPIC_2_UUID,
+            TOPIC_2_NAME,
+            2
+        ));
+
+        Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new 
HashMap<>();
+
+        Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
+        topicsSubscription1.add(TOPIC_1_UUID);
+        topicsSubscription1.add(TOPIC_2_UUID);
+
+        members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            topicsSubscription1,
+            Assignment.EMPTY
+        ));
+
+        members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            topicsSubscription1,
+            Assignment.EMPTY
+        ));
+
+        GroupSpec groupSpec1 = new GroupSpecImpl(
+            members1,
+            HOMOGENEOUS,
+            Map.of()
+        );
+        SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new 
SubscribedTopicDescriberImpl(topicMetadata1);
+
+        GroupAssignment computedAssignment1 = assignor.assign(
+            groupSpec1,
+            subscribedTopicMetadata1
+        );
+
+        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
+        // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
+        // Step 2 -> T1:2, T2:1 -> MEMBER_A and T2:0 -> MEMBER_B by 
round-robin assignment.
+        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new 
HashMap<>();
+        expectedAssignment1.put(MEMBER_A, mkAssignment(
+            mkTopicAssignment(TOPIC_1_UUID, 0, 2),
+            mkTopicAssignment(TOPIC_2_UUID, 1)
+        ));
+        expectedAssignment1.put(MEMBER_B, mkAssignment(
+            mkTopicAssignment(TOPIC_1_UUID, 1),
+            mkTopicAssignment(TOPIC_2_UUID, 0)
+        ));
+
+        // T1: 3 partitions + T2: 2 partitions = 5 partitions
+        assertEveryPartitionGetsAssignment(5, computedAssignment1);
+        assertAssignment(expectedAssignment1, computedAssignment1);
+
+        // New assignment setup - Three members A, B, C subscribing to T2 and 
T3.
+        Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
+        topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
+            TOPIC_2_UUID,
+            TOPIC_2_NAME,
+            2
+        ));
+        topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
+            TOPIC_3_UUID,
+            TOPIC_3_NAME,
+            3
+        ));
+
+        Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new 
HashMap<>();
+
+        Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
+        topicsSubscription2.add(TOPIC_2_UUID);
+        topicsSubscription2.add(TOPIC_3_UUID);
+
+        members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            topicsSubscription2,
+            // Utilizing the assignment from current assignment
+            new Assignment(mkAssignment(
+                mkTopicAssignment(TOPIC_1_UUID, 0, 2),
+                mkTopicAssignment(TOPIC_2_UUID, 1)))
+        ));
+
+        members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            topicsSubscription2,
+            new Assignment(mkAssignment(
+                mkTopicAssignment(TOPIC_1_UUID, 1),
+                mkTopicAssignment(TOPIC_2_UUID, 0)))
+        ));
+
+        members2.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            topicsSubscription2,
+            Assignment.EMPTY
+        ));
+
+        GroupSpec groupSpec2 = new GroupSpecImpl(
+            members2,
+            HOMOGENEOUS,
+            Map.of()
+        );
+        SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new 
SubscribedTopicDescriberImpl(topicMetadata2);
+
+        GroupAssignment computedAssignment2 = assignor.assign(
+            groupSpec2,
+            subscribedTopicMetadata2
+        );
+
+        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of 
MEMBER_C is 67.
+        // Step 1 -> T2:0 -> MEMBER_A, T2:1 -> MEMBER_B, T3:0 -> MEMBER_C by 
hash assignment
+        // Step 2 -> T3:1 -> MEMBER_A, T3:2 -> MEMBER_B by round-robin 
assignment
+        // Step 3 -> no new addition by current assignment since T2:0 and T2:1 
were already a part of new assignment.
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new 
HashMap<>();
+        expectedAssignment2.put(MEMBER_A, mkAssignment(
+            mkTopicAssignment(TOPIC_2_UUID, 0),
+            mkTopicAssignment(TOPIC_3_UUID, 1)
+        ));
+        expectedAssignment2.put(MEMBER_B, mkAssignment(
+            mkTopicAssignment(TOPIC_2_UUID, 1),
+            mkTopicAssignment(TOPIC_3_UUID, 2)
+        ));
+        expectedAssignment2.put(MEMBER_C, mkAssignment(
+            mkTopicAssignment(TOPIC_3_UUID, 0)
+        ));
+
+        // T2: 2 partitions + T3: 3 partitions = 5 partitions
+        assertEveryPartitionGetsAssignment(5, computedAssignment2);
+        assertAssignment(expectedAssignment2, computedAssignment2);
+    }
+
+    @Test
+    public void testAssignWithCurrentAssignmentHeterogeneous() {
+        // Current assignment setup - 3 members A - {T1, T2}, B - {T3}, C - 
{T2, T3}.
+        Map<Uuid, TopicMetadata> topicMetadata1 = new HashMap<>();
+        topicMetadata1.put(TOPIC_1_UUID, new TopicMetadata(
+            TOPIC_1_UUID,
+            TOPIC_1_NAME,
+            3
+        ));
+
+        topicMetadata1.put(TOPIC_2_UUID, new TopicMetadata(
+            TOPIC_2_UUID,
+            TOPIC_2_NAME,
+            3
+        ));
+        topicMetadata1.put(TOPIC_3_UUID, new TopicMetadata(
+            TOPIC_3_UUID,
+            TOPIC_3_NAME,
+            2
+        ));
+
+        Set<Uuid> memberATopicsSubscription1 = new LinkedHashSet<>();
+        memberATopicsSubscription1.add(TOPIC_1_UUID);
+        memberATopicsSubscription1.add(TOPIC_2_UUID);
+
+        Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new 
HashMap<>();
+        members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            memberATopicsSubscription1,
+            Assignment.EMPTY
+        ));
+
+        members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            Set.of(TOPIC_3_UUID),
+            Assignment.EMPTY
+        ));
+
+        Set<Uuid> memberCTopicsSubscription1 = new LinkedHashSet<>();
+        memberCTopicsSubscription1.add(TOPIC_2_UUID);
+        memberCTopicsSubscription1.add(TOPIC_3_UUID);
+        members1.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            memberCTopicsSubscription1,
+            Assignment.EMPTY
+        ));
+
+        GroupSpec groupSpec1 = new GroupSpecImpl(
+            members1,
+            HETEROGENEOUS,
+            Map.of()
+        );
+        SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new 
SubscribedTopicDescriberImpl(topicMetadata1);
+
+        GroupAssignment computedAssignment1 = assignor.assign(
+            groupSpec1,
+            subscribedTopicMetadata1
+        );
+
+        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of 
MEMBER_C is 67.
+        // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by 
hash assignment.
+        // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, 
T2:1 -> member_C by round-robin assignment.
+        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new 
HashMap<>();
+        expectedAssignment1.put(MEMBER_A, mkAssignment(
+            mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
+            mkTopicAssignment(TOPIC_2_UUID, 0, 2)
+        ));
+        expectedAssignment1.put(MEMBER_B, mkAssignment(
+            mkTopicAssignment(TOPIC_3_UUID, 0, 1)
+        ));
+        expectedAssignment1.put(MEMBER_C, mkAssignment(
+            mkTopicAssignment(TOPIC_2_UUID, 1, 2)
+        ));
+
+        // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 
partitions
+        assertEveryPartitionGetsAssignment(8, computedAssignment1);
+        assertAssignment(expectedAssignment1, computedAssignment1);
+
+        // New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}.
+
+        Map<Uuid, TopicMetadata> topicMetadata2 = new HashMap<>();
+        topicMetadata2.put(TOPIC_1_UUID, new TopicMetadata(
+            TOPIC_1_UUID,
+            TOPIC_1_NAME,
+            3
+        ));
+        topicMetadata2.put(TOPIC_2_UUID, new TopicMetadata(
+            TOPIC_2_UUID,
+            TOPIC_2_NAME,
+            3
+        ));
+        topicMetadata2.put(TOPIC_3_UUID, new TopicMetadata(
+            TOPIC_3_UUID,
+            TOPIC_3_NAME,
+            2
+        ));
+        topicMetadata2.put(TOPIC_4_UUID, new TopicMetadata(
+            TOPIC_4_UUID,
+            TOPIC_4_NAME,
+            1
+        ));
+
+        Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new 
HashMap<>();
+
+        Set<Uuid> memberATopicsSubscription2 = new LinkedHashSet<>();
+        memberATopicsSubscription2.add(TOPIC_1_UUID);
+        memberATopicsSubscription2.add(TOPIC_2_UUID);
+        memberATopicsSubscription2.add(TOPIC_3_UUID);
+
+        Set<Uuid> memberBTopicsSubscription2 = new LinkedHashSet<>();
+        memberBTopicsSubscription2.add(TOPIC_3_UUID);
+        memberBTopicsSubscription2.add(TOPIC_4_UUID);
+
+        members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            memberATopicsSubscription2,
+            new Assignment(mkAssignment(
+                mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
+                mkTopicAssignment(TOPIC_2_UUID, 0, 2)))
+        ));
+
+        members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
+            Optional.empty(),
+            Optional.empty(),
+            memberBTopicsSubscription2,
+            new Assignment(mkAssignment(
+                mkTopicAssignment(TOPIC_3_UUID, 0, 1)))
+        ));
+
+        GroupSpec groupSpec2 = new GroupSpecImpl(
+            members2,
+            HETEROGENEOUS,
+            Map.of()
+        );
+
+        SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new 
SubscribedTopicDescriberImpl(topicMetadata2);
+
+        GroupAssignment computedAssignment2 = assignor.assign(
+            groupSpec2,
+            subscribedTopicMetadata2
+        );
+
+        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
+        // Step 1 -> T1:1 -> member_A, T3:0 -> member_B by hash assignment.
+        // Step 2 -> T2:1 -> member_A, T4:0 -> member_B by round-robin 
assignment.
+        // Step 3 -> T1:0, T1:2, T2:0 -> member_A,  T3:1 -> member_B by 
current assignment.
+        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new 
HashMap<>();
+        expectedAssignment2.put(MEMBER_A, mkAssignment(
+            mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
+            mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
+        ));
+        expectedAssignment2.put(MEMBER_B, mkAssignment(
+            mkTopicAssignment(TOPIC_3_UUID, 0, 1),
+            mkTopicAssignment(TOPIC_4_UUID, 0)
+        ));
+
+        // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions + T4: 1 
partition = 9 partitions
+        assertEveryPartitionGetsAssignment(9, computedAssignment2);
+        assertAssignment(expectedAssignment2, computedAssignment2);
+    }
+
     private void assertAssignment(
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment,
         GroupAssignment computedGroupAssignment
@@ -336,4 +745,31 @@ public class SimpleAssignorTest {
             assertEquals(expectedAssignment.get(memberId), 
computedAssignmentForMember);
         }
     }
+
+    private void assertAssignment(
+        Map<TopicIdPartition, List<String>> expectedAssignment,
+        Map<TopicIdPartition, List<String>> computedAssignment
+    ) {
+        assertEquals(expectedAssignment.size(), computedAssignment.size());
+        expectedAssignment.forEach((topicIdPartition, members) -> {
+            List<String> computedMembers = 
computedAssignment.getOrDefault(topicIdPartition, List.of());
+            assertEquals(members.size(), computedMembers.size());
+            members.forEach(member -> 
assertTrue(computedMembers.contains(member)));
+        });
+    }
+
+    private void assertEveryPartitionGetsAssignment(
+        int expectedPartitions,
+        GroupAssignment computedGroupAssignment
+    ) {
+        Map<String, MemberAssignment> memberAssignments = 
computedGroupAssignment.members();
+        Set<TopicIdPartition> topicPartitionAssignments = new HashSet<>();
+        memberAssignments.values().forEach(memberAssignment -> {
+            Map<Uuid, Set<Integer>> topicIdPartitions = 
memberAssignment.partitions();
+            topicIdPartitions.forEach((topicId, partitions) ->
+                partitions.forEach(partition -> 
topicPartitionAssignments.add(new TopicIdPartition(topicId, partition)))
+            );
+        });
+        assertEquals(expectedPartitions, topicPartitionAssignments.size());
+    }
 }

Reply via email to