This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 81a5e22  KAFKA-7144: Fix task assignment to be even (#5390)
81a5e22 is described below

commit 81a5e223f2f354456861ce4d7fe3cb4c6b019fa0
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Jul 25 01:00:18 2018 -0400

    KAFKA-7144: Fix task assignment to be even (#5390)
    
    This PR now justs removes the check in TaskPairs.hasNewPair that was 
causing the task assignment issue.
    
    This was done as we need to further refine task assignment strategy and 
this approach needs to include the statefulness of tasks and is best done in 
one pass vs taking a "patchy" approach.
    
    Updated current tests and ran locally
    
    Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../internals/assignment/StickyTaskAssignor.java      |  2 +-
 .../internals/assignment/StickyTaskAssignorTest.java  | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 5b54d08..8767d0f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -270,7 +270,7 @@ public class StickyTaskAssignor<ID> implements 
TaskAssignor<ID, TaskId> {
                 if (!active && !pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
-                if (!pairs.contains(pair(task1, taskId)) && task1.topicGroupId 
!= taskId.topicGroupId) {
+                if (!pairs.contains(pair(task1, taskId))) {
                     return true;
                 }
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index ed22e3c..d431dbe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -152,6 +152,25 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
+    public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() {
+
+        createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, 
task03,
+                                                            task04, task05, 
task10);
+
+        createClient(p2, 1);
+
+        final StickyTaskAssignor taskAssignor = createTaskAssignor(task10, 
task00, task01, task02, task03, task04, task05);
+
+        final Set<TaskId> expectedClientITasks = new 
HashSet<>(Arrays.asList(task00, task01, task10, task05));
+        final Set<TaskId> expectedClientIITasks = new 
HashSet<>(Arrays.asList(task02, task03, task04));
+
+        taskAssignor.assign(0);
+
+        assertThat(clients.get(p1).activeTasks(), 
equalTo(expectedClientITasks));
+        assertThat(clients.get(p2).activeTasks(), 
equalTo(expectedClientIITasks));
+    }
+
+    @Test
     public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
         final int p5 = 5;
         createClientWithPreviousActiveTasks(p1, 1, task00);

Reply via email to