Repository: kafka
Updated Branches:
  refs/heads/trunk c9872cb21 -> 0fba52960


KAFKA-4677 Follow Up: add optimization to StickyTaskAssignor for rolling 
rebounce

Detect when a rebalance has happened due to one or more existing nodes 
bouncing. Keep assignment of previous active tasks the same and only assign the 
tasks that were not active to the new clients.

Author: Damian Guy <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #2609 from dguy/kstreams-575


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0fba5296
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0fba5296
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0fba5296

Branch: refs/heads/trunk
Commit: 0fba529608a5eb829feb66a499c89ead40b79694
Parents: c9872cb
Author: Damian Guy <[email protected]>
Authored: Wed Mar 1 11:21:41 2017 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Wed Mar 1 11:21:41 2017 -0800

----------------------------------------------------------------------
 .../internals/assignment/ClientState.java       |   4 +
 .../assignment/StickyTaskAssignor.java          |  71 +++++++----
 .../internals/assignment/ClientStateTest.java   |  13 ++
 .../assignment/StickyTaskAssignorTest.java      | 121 ++++++++++++++++++-
 4 files changed, 182 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 3e9a521..d5f8ccf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -150,4 +150,8 @@ public class ClientState<T> {
     int capacity() {
         return capacity;
     }
+
+    boolean hasUnfulfilledQuota(final int tasksPerThread) {
+        return activeTasks.size() < capacity * tasksPerThread;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index f06ecae..81c9305 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -35,16 +36,12 @@ public class StickyTaskAssignor<ID> implements 
TaskAssignor<ID, TaskId> {
     private final Map<TaskId, ID> previousActiveTaskAssignment = new 
HashMap<>();
     private final Map<TaskId, Set<ID>> previousStandbyTaskAssignment = new 
HashMap<>();
     private final TaskPairs taskPairs;
-    private final int availableCapacity;
-    private final boolean hasNewTasks;
 
     public StickyTaskAssignor(final Map<ID, ClientState<TaskId>> clients, 
final Set<TaskId> taskIds) {
         this.clients = clients;
         this.taskIds = taskIds;
-        this.availableCapacity = sumCapacity(clients.values());
         taskPairs = new TaskPairs(taskIds.size() * (taskIds.size() - 1) / 2);
         mapPreviousTaskAssignment(clients);
-        this.hasNewTasks = 
!previousActiveTaskAssignment.keySet().containsAll(taskIds);
     }
 
     @Override
@@ -66,35 +63,69 @@ public class StickyTaskAssignor<ID> implements 
TaskAssignor<ID, TaskId> {
                              numStandbyReplicas, taskId);
                     break;
                 }
-                assign(taskId, ids, false);
+                allocateTaskWithClientCandidates(taskId, ids, false);
             }
         }
     }
 
     private void assignActive() {
-        final Set<TaskId> previouslyAssignedTaskIds = new 
HashSet<>(previousActiveTaskAssignment.keySet());
-        
previouslyAssignedTaskIds.addAll(previousStandbyTaskAssignment.keySet());
-        previouslyAssignedTaskIds.retainAll(taskIds);
-
-        // assign previously assigned tasks first
-        for (final TaskId taskId : previouslyAssignedTaskIds) {
-            assign(taskId, clients.keySet(), true);
+        final int totalCapacity = sumCapacity(clients.values());
+        final int tasksPerThread = taskIds.size() / totalCapacity;
+        final Set<TaskId> assigned = new HashSet<>();
+
+        // first try and re-assign existing active tasks to clients that 
previously had
+        // the same active task
+        for (final Map.Entry<TaskId, ID> entry : 
previousActiveTaskAssignment.entrySet()) {
+            final TaskId taskId = entry.getKey();
+            if (taskIds.contains(taskId)) {
+                final ClientState<TaskId> client = 
clients.get(entry.getValue());
+                if (client.hasUnfulfilledQuota(tasksPerThread)) {
+                    assignTaskToClient(assigned, taskId, client);
+                }
+            }
         }
 
-        final Set<TaskId> newTasks  = new HashSet<>(taskIds);
-        newTasks.removeAll(previouslyAssignedTaskIds);
+        final Set<TaskId> unassigned = new HashSet<>(taskIds);
+        unassigned.removeAll(assigned);
+
+        // try and assign any remaining unassigned tasks to clients that 
previously
+        // have seen the task.
+        for (final Iterator<TaskId> iterator = unassigned.iterator(); 
iterator.hasNext(); ) {
+            final TaskId taskId = iterator.next();
+            final Set<ID> clientIds = 
previousStandbyTaskAssignment.get(taskId);
+            if (clientIds != null) {
+                for (final ID clientId : clientIds) {
+                    final ClientState<TaskId> client = clients.get(clientId);
+                    if (client.hasUnfulfilledQuota(tasksPerThread)) {
+                        assignTaskToClient(assigned, taskId, client);
+                        iterator.remove();
+                        break;
+                    }
+                }
+            }
+        }
 
-        for (final TaskId taskId : newTasks) {
-            assign(taskId, clients.keySet(), true);
+        // assign any remaining unassigned tasks
+        for (final TaskId taskId : unassigned) {
+            allocateTaskWithClientCandidates(taskId, clients.keySet(), true);
         }
+
     }
 
-    private void assign(final TaskId taskId, final Set<ID> clientsWithin, 
final boolean active) {
+
+
+    private void allocateTaskWithClientCandidates(final TaskId taskId, final 
Set<ID> clientsWithin, final boolean active) {
         final ClientState<TaskId> client = findClient(taskId, clientsWithin);
         taskPairs.addPairs(taskId, client.assignedTasks());
         client.assign(taskId, active);
     }
 
+    private void assignTaskToClient(final Set<TaskId> assigned, final TaskId 
taskId, final ClientState<TaskId> client) {
+        taskPairs.addPairs(taskId, client.assignedTasks());
+        client.assign(taskId, true);
+        assigned.add(taskId);
+    }
+
     private Set<ID> findClientsWithoutAssignedTask(final TaskId taskId) {
         final Set<ID> clientIds = new HashSet<>();
         for (final Map.Entry<ID, ClientState<TaskId>> client : 
clients.entrySet()) {
@@ -131,9 +162,7 @@ public class StickyTaskAssignor<ID> implements 
TaskAssignor<ID, TaskId> {
     }
 
     private boolean shouldBalanceLoad(final ClientState<TaskId> client) {
-        return !hasNewTasks
-                && client.reachedCapacity()
-                && hasClientsWithMoreAvailableCapacity(client);
+        return client.reachedCapacity() && 
hasClientsWithMoreAvailableCapacity(client);
     }
 
     private boolean hasClientsWithMoreAvailableCapacity(final 
ClientState<TaskId> client) {
@@ -208,6 +237,7 @@ public class StickyTaskAssignor<ID> implements 
TaskAssignor<ID, TaskId> {
                 
previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey());
             }
         }
+
     }
 
     private int sumCapacity(final Collection<ClientState<TaskId>> values) {
@@ -218,7 +248,6 @@ public class StickyTaskAssignor<ID> implements 
TaskAssignor<ID, TaskId> {
         return capacity;
     }
 
-
     private static class TaskPairs {
         private final Set<Pair> pairs;
         private final int maxPairs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index 6692844..af2c9e3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -146,5 +146,18 @@ public class ClientStateTest {
         c1.hasMoreAvailableCapacityThan(new ClientState<Integer>(0));
     }
 
+    @Test
+    public void 
shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread()
 throws Exception {
+        final ClientState<Integer> client = new ClientState<>(1);
+        client.assign(1, true);
+        assertTrue(client.hasUnfulfilledQuota(2));
+    }
+
+    @Test
+    public void 
shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread()
 throws Exception {
+        final ClientState<Integer> client = new ClientState<>(1);
+        client.assign(1, true);
+        assertFalse(client.hasUnfulfilledQuota(1));
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index a782ea3..f37bf7d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -31,6 +31,7 @@ import java.util.TreeMap;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsCollectionContaining.hasItems;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertTrue;
@@ -41,6 +42,8 @@ public class StickyTaskAssignorTest {
     private final TaskId task01 = new TaskId(0, 1);
     private final TaskId task02 = new TaskId(0, 2);
     private final TaskId task03 = new TaskId(0, 3);
+    private final TaskId task04 = new TaskId(0, 4);
+    private final TaskId task05 = new TaskId(0, 5);
     private final Map<Integer, ClientState<TaskId>> clients = new TreeMap<>();
     private final Integer p1 = 1;
     private final Integer p2 = 2;
@@ -448,9 +451,6 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void shouldNotMoveAnyTasksWhenNewTasksAdded() throws Exception {
-        final TaskId task04 = new TaskId(0, 4);
-        final TaskId task05 = new TaskId(0, 5);
-
         createClientWithPreviousActiveTasks(p1, 1, task00, task01);
         createClientWithPreviousActiveTasks(p2, 1, task02, task03);
 
@@ -463,8 +463,6 @@ public class StickyTaskAssignorTest {
 
     @Test
     public void 
shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() throws 
Exception {
-        final TaskId task04 = new TaskId(0, 4);
-        final TaskId task05 = new TaskId(0, 5);
 
         createClientWithPreviousActiveTasks(p1, 1, task02, task01);
         createClientWithPreviousActiveTasks(p2, 1, task00, task03);
@@ -478,9 +476,120 @@ public class StickyTaskAssignorTest {
         assertThat(clients.get(p3).activeTasks(), hasItems(task04, task05));
     }
 
+    @Test
+    public void shouldAssignTasksNotPreviouslyActiveToNewClient() throws 
Exception {
+        final TaskId task10 = new TaskId(0, 10);
+        final TaskId task11 = new TaskId(0, 11);
+        final TaskId task12 = new TaskId(1, 2);
+        final TaskId task13 = new TaskId(1, 3);
+        final TaskId task20 = new TaskId(2, 0);
+        final TaskId task21 = new TaskId(2, 1);
+        final TaskId task22 = new TaskId(2, 2);
+        final TaskId task23 = new TaskId(2, 3);
+
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 
1, task01, task12, task13);
+        c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, 
task23));
+        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 
1, task00, task11, task22);
+        c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, 
task03, task12, task21, task13, task23));
+        final ClientState<TaskId> c3 = createClientWithPreviousActiveTasks(p3, 
1, task20, task21, task23);
+        c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
+
+        final ClientState<TaskId> newClient = createClient(p4, 1);
+        newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, 
task02, task11, task20, task03, task12, task21, task13, task22, task23));
+
+        final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, 
task12, task21, task13, task22, task23);
+        taskAssignor.assign(0);
+
+        assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, 
task13)));
+        assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, 
task22)));
+        assertThat(c3.activeTasks(), equalTo(Utils.mkSet(task20, task21, 
task23)));
+        assertThat(newClient.activeTasks(), equalTo(Utils.mkSet(task02, 
task03, task10)));
+    }
+
+    @Test
+    public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() 
throws Exception {
+        final TaskId task10 = new TaskId(0, 10);
+        final TaskId task11 = new TaskId(0, 11);
+        final TaskId task12 = new TaskId(1, 2);
+        final TaskId task13 = new TaskId(1, 3);
+        final TaskId task20 = new TaskId(2, 0);
+        final TaskId task21 = new TaskId(2, 1);
+        final TaskId task22 = new TaskId(2, 2);
+        final TaskId task23 = new TaskId(2, 3);
+
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 
1, task01, task12, task13);
+        c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, 
task23));
+        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 
1, task00, task11, task22);
+        c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, 
task03, task12, task21, task13, task23));
+
+        final ClientState<TaskId> bounce1 = createClient(p3, 1);
+        bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
+
+        final ClientState<TaskId> bounce2 = createClient(p4, 1);
+        bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
+
+        final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, 
task12, task21, task13, task22, task23);
+        taskAssignor.assign(0);
+
+        assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, 
task13)));
+        assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, 
task22)));
+        assertThat(bounce1.activeTasks(), equalTo(Utils.mkSet(task20, task21, 
task23)));
+        assertThat(bounce2.activeTasks(), equalTo(Utils.mkSet(task02, task03, 
task10)));
+    }
+
+    @Test
+    public void shouldAssignTasksToNewClient() throws Exception {
+        createClientWithPreviousActiveTasks(p1, 1, task01, task02);
+        createClient(p2, 1);
+        createTaskAssignor(task01, task02).assign(0);
+        assertThat(clients.get(p1).activeTaskCount(), equalTo(1));
+    }
+
+    @Test
+    public void 
shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() 
throws Exception {
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 
1, task00, task01, task02);
+        final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 
1, task03, task04, task05);
+        final ClientState<TaskId> newClient = createClient(p3, 1);
+
+        final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(task00, task01, task02, task03, task04, task05);
+        taskAssignor.assign(0);
+        assertThat(c1.activeTasks(), not(hasItem(task03)));
+        assertThat(c1.activeTasks(), not(hasItem(task04)));
+        assertThat(c1.activeTasks(), not(hasItem(task05)));
+        assertThat(c1.activeTaskCount(), equalTo(2));
+        assertThat(c2.activeTasks(), not(hasItems(task00)));
+        assertThat(c2.activeTasks(), not(hasItems(task01)));
+        assertThat(c2.activeTasks(), not(hasItems(task02)));
+        assertThat(c2.activeTaskCount(), equalTo(2));
+        assertThat(newClient.activeTaskCount(), equalTo(2));
+    }
+
+    @Test
+    public void 
shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients()
 throws Exception {
+        final TaskId task06 = new TaskId(0, 6);
+        final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 
1, task00, task01, task02, task06);
+        final ClientState<TaskId> c2 = createClient(p2, 1);
+        c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
+        final ClientState<TaskId> newClient = createClient(p3, 1);
+
+        final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(task00, task01, task02, task03, task04, task05, task06);
+        taskAssignor.assign(0);
+        assertThat(c1.activeTasks(), not(hasItem(task03)));
+        assertThat(c1.activeTasks(), not(hasItem(task04)));
+        assertThat(c1.activeTasks(), not(hasItem(task05)));
+        assertThat(c1.activeTaskCount(), equalTo(3));
+        assertThat(c2.activeTasks(), not(hasItems(task00)));
+        assertThat(c2.activeTasks(), not(hasItems(task01)));
+        assertThat(c2.activeTasks(), not(hasItems(task02)));
+        assertThat(c2.activeTaskCount(), equalTo(2));
+        assertThat(newClient.activeTaskCount(), equalTo(2));
+    }
+
     private StickyTaskAssignor<Integer> createTaskAssignor(final TaskId... 
tasks) {
+        final List<TaskId> taskIds = Arrays.asList(tasks);
+        Collections.shuffle(taskIds);
         return new StickyTaskAssignor<>(clients,
-                                        new HashSet<>(Arrays.asList(tasks)));
+                                        new HashSet<>(taskIds));
     }
 
     private List<TaskId> allActiveTasks() {

Reply via email to