This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d9a427  KAFKA-7144: Fix task assignment to be even (#5390)
1d9a427 is described below

commit 1d9a427225c64e7629a4eb2e2d129d5551185049
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Jul 25 01:00:18 2018 -0400

    KAFKA-7144: Fix task assignment to be even (#5390)
    
    This PR now justs removes the check in TaskPairs.hasNewPair that was 
causing the task assignment issue.
    
    This was done as we need to further refine task assignment strategy and 
this approach needs to include the statefulness of tasks and is best done in 
one pass vs taking a "patchy" approach.
    
    Updated current tests and ran locally
    
    Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../internals/assignment/StickyTaskAssignor.java      |  2 +-
 .../internals/assignment/StickyTaskAssignorTest.java  | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 5b54d08..8767d0f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -270,7 +270,7 @@ public class StickyTaskAssignor<ID> implements 
TaskAssignor<ID, TaskId> {
                 if (!active && !pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
-                if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId 
!= taskId.topicGroupId) {
+                if (!pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index ed22e3c..d431dbe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -152,6 +152,25 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
+    public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+
+        createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, 
task03,
+                                                            task04, task05, 
task10);
+
+        createClient(p2, 1);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, 
task00, task01, task02, task03, task04, task05);
+
+        final Set<TaskId> expectedClientITasks = new 
HashSet<>(Arrays.asList(task00, task01, task10, task05));
+        final Set<TaskId> expectedClientIITasks = new 
HashSet<>(Arrays.asList(task02, task03, task04));
+
+        taskAssignor.assign(0);
+
+        assertThat(clients.get(p1).activeTasks(), 
equalTo(expectedClientITasks));
+        assertThat(clients.get(p2).activeTasks(), 
equalTo(expectedClientIITasks));
+    }
+
+    @Test
     public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
         final int p5 = 5;
         createClientWithPreviousActiveTasks(p1, 1, task00);

Reply via email to