This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new fb2eb8eb8bd Add unit test for StickyTaskAssignor (#18169)
fb2eb8eb8bd is described below
commit fb2eb8eb8bd2d92847069c0d0301e0163a02895c
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Dec 13 15:34:15 2024 +0100
Add unit test for StickyTaskAssignor (#18169)
This PR formulates a unit test that verifies the fix merged with PR #18051.
---
.../group/taskassignor/StickyTaskAssignorTest.java | 30 ++++++++++++++++++++++
1 file changed, 30 insertions(+)
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
index 740ad6bf90a..3eadffce88f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java
@@ -403,6 +403,36 @@ public class StickyTaskAssignorTest {
assertEquals(Collections.singleton(0),
testMember3.activeTasks().get("test-subtopology"));
}
+ @Test
+ public void
shouldNotAssignStandbyTasksToClientWithPreviousStandbyTasksAndCurrentActiveTasks()
{
+ final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1", Collections.emptyMap(),
mkMap(mkEntry("test-subtopology", Collections.singleton(0))));
+ final AssignmentMemberSpec memberSpec2 =
createAssignmentMemberSpec("process2", Collections.emptyMap(),
mkMap(mkEntry("test-subtopology", Collections.singleton(1))));
+
+
+ Map<String, AssignmentMemberSpec> members = mkMap(
+ mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
+
+ GroupAssignment result = assignor.assign(
+ new GroupSpecImpl(members,
Collections.singletonList("test-subtopology"),
mkMap(mkEntry("numStandbyReplicas", "1"))),
+ new TopologyDescriberImpl(2, true)
+ );
+
+ MemberAssignment testMember1 = result.members().get("member1");
+ assertNotNull(testMember1);
+ assertEquals(1,
testMember1.activeTasks().get("test-subtopology").size());
+ assertEquals(Collections.singleton(0),
testMember1.activeTasks().get("test-subtopology"));
+ assertEquals(1,
testMember1.standbyTasks().get("test-subtopology").size());
+ assertEquals(Collections.singleton(1),
testMember1.standbyTasks().get("test-subtopology"));
+
+
+ MemberAssignment testMember2 = result.members().get("member2");
+ assertNotNull(testMember2);
+ assertEquals(1,
testMember2.activeTasks().get("test-subtopology").size());
+ assertEquals(Collections.singleton(1),
testMember2.activeTasks().get("test-subtopology"));
+ assertEquals(1,
testMember2.standbyTasks().get("test-subtopology").size());
+ assertEquals(Collections.singleton(0),
testMember2.standbyTasks().get("test-subtopology"));
+ }
+
@Test
public void
shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
final AssignmentMemberSpec memberSpec1 =
createAssignmentMemberSpec("process1",