Repository: kafka Updated Branches: refs/heads/trunk c9872cb21 -> 0fba52960
KAFKA-4677 Follow Up: add optimization to StickyTaskAssignor for rolling rebounce Detect when a rebalance has happened due to one or more existing nodes bouncing. Keep assignment of previous active tasks the same and only assign the tasks that were not active to the new clients. Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #2609 from dguy/kstreams-575 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0fba5296 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0fba5296 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0fba5296 Branch: refs/heads/trunk Commit: 0fba529608a5eb829feb66a499c89ead40b79694 Parents: c9872cb Author: Damian Guy <[email protected]> Authored: Wed Mar 1 11:21:41 2017 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 1 11:21:41 2017 -0800 ---------------------------------------------------------------------- .../internals/assignment/ClientState.java | 4 + .../assignment/StickyTaskAssignor.java | 71 +++++++---- .../internals/assignment/ClientStateTest.java | 13 ++ .../assignment/StickyTaskAssignorTest.java | 121 ++++++++++++++++++- 4 files changed, 182 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 3e9a521..d5f8ccf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -150,4 +150,8 @@ public class ClientState<T> { int capacity() { return capacity; } + + boolean hasUnfulfilledQuota(final int tasksPerThread) { + return activeTasks.size() < capacity * tasksPerThread; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index f06ecae..81c9305 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -35,16 +36,12 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> { private final Map<TaskId, ID> previousActiveTaskAssignment = new HashMap<>(); private final Map<TaskId, Set<ID>> previousStandbyTaskAssignment = new HashMap<>(); private final TaskPairs taskPairs; - private final int availableCapacity; - private final boolean hasNewTasks; public StickyTaskAssignor(final Map<ID, ClientState<TaskId>> clients, final Set<TaskId> taskIds) { this.clients = clients; this.taskIds = taskIds; - this.availableCapacity = sumCapacity(clients.values()); taskPairs = new TaskPairs(taskIds.size() * (taskIds.size() - 1) / 2); mapPreviousTaskAssignment(clients); - this.hasNewTasks = !previousActiveTaskAssignment.keySet().containsAll(taskIds); } @Override @@ -66,35 +63,69 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> { numStandbyReplicas, taskId); break; } - assign(taskId, ids, false); + allocateTaskWithClientCandidates(taskId, ids, false); } } } private void assignActive() { - final Set<TaskId> previouslyAssignedTaskIds = new HashSet<>(previousActiveTaskAssignment.keySet()); - previouslyAssignedTaskIds.addAll(previousStandbyTaskAssignment.keySet()); - previouslyAssignedTaskIds.retainAll(taskIds); - - // assign previously assigned tasks first - for (final TaskId taskId : previouslyAssignedTaskIds) { - assign(taskId, clients.keySet(), true); + final int totalCapacity = sumCapacity(clients.values()); + final int tasksPerThread = taskIds.size() / totalCapacity; + final Set<TaskId> assigned = new HashSet<>(); + + // first try and re-assign existing active tasks to clients that previously had + // the same active task + for (final Map.Entry<TaskId, ID> entry : previousActiveTaskAssignment.entrySet()) { + final TaskId taskId = entry.getKey(); + if (taskIds.contains(taskId)) { + final ClientState<TaskId> client = clients.get(entry.getValue()); + if (client.hasUnfulfilledQuota(tasksPerThread)) { + assignTaskToClient(assigned, taskId, client); + } + } } - final Set<TaskId> newTasks = new HashSet<>(taskIds); - newTasks.removeAll(previouslyAssignedTaskIds); + final Set<TaskId> unassigned = new HashSet<>(taskIds); + unassigned.removeAll(assigned); + + // try and assign any remaining unassigned tasks to clients that previously + // have seen the task. + for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext(); ) { + final TaskId taskId = iterator.next(); + final Set<ID> clientIds = previousStandbyTaskAssignment.get(taskId); + if (clientIds != null) { + for (final ID clientId : clientIds) { + final ClientState<TaskId> client = clients.get(clientId); + if (client.hasUnfulfilledQuota(tasksPerThread)) { + assignTaskToClient(assigned, taskId, client); + iterator.remove(); + break; + } + } + } + } - for (final TaskId taskId : newTasks) { - assign(taskId, clients.keySet(), true); + // assign any remaining unassigned tasks + for (final TaskId taskId : unassigned) { + allocateTaskWithClientCandidates(taskId, clients.keySet(), true); } + } - private void assign(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) { + + + private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) { final ClientState<TaskId> client = findClient(taskId, clientsWithin); taskPairs.addPairs(taskId, client.assignedTasks()); client.assign(taskId, active); } + private void assignTaskToClient(final Set<TaskId> assigned, final TaskId taskId, final ClientState<TaskId> client) { + taskPairs.addPairs(taskId, client.assignedTasks()); + client.assign(taskId, true); + assigned.add(taskId); + } + private Set<ID> findClientsWithoutAssignedTask(final TaskId taskId) { final Set<ID> clientIds = new HashSet<>(); for (final Map.Entry<ID, ClientState<TaskId>> client : clients.entrySet()) { @@ -131,9 +162,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> { } private boolean shouldBalanceLoad(final ClientState<TaskId> client) { - return !hasNewTasks - && client.reachedCapacity() - && hasClientsWithMoreAvailableCapacity(client); + return client.reachedCapacity() && hasClientsWithMoreAvailableCapacity(client); } private boolean hasClientsWithMoreAvailableCapacity(final ClientState<TaskId> client) { @@ -208,6 +237,7 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> { previousStandbyTaskAssignment.get(prevAssignedTask).add(clientState.getKey()); } } + } private int sumCapacity(final Collection<ClientState<TaskId>> values) { @@ -218,7 +248,6 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> { return capacity; } - private static class TaskPairs { private final Set<Pair> pairs; private final int maxPairs; http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index 6692844..af2c9e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -146,5 +146,18 @@ public class ClientStateTest { c1.hasMoreAvailableCapacityThan(new ClientState<Integer>(0)); } + @Test + public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread() throws Exception { + final ClientState<Integer> client = new ClientState<>(1); + client.assign(1, true); + assertTrue(client.hasUnfulfilledQuota(2)); + } + + @Test + public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread() throws Exception { + final ClientState<Integer> client = new ClientState<>(1); + client.assign(1, true); + assertFalse(client.hasUnfulfilledQuota(1)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/0fba5296/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index a782ea3..f37bf7d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -31,6 +31,7 @@ import java.util.TreeMap; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.hamcrest.core.IsCollectionContaining.hasItems; import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertTrue; @@ -41,6 +42,8 @@ public class StickyTaskAssignorTest { private final TaskId task01 = new TaskId(0, 1); private final TaskId task02 = new TaskId(0, 2); private final TaskId task03 = new TaskId(0, 3); + private final TaskId task04 = new TaskId(0, 4); + private final TaskId task05 = new TaskId(0, 5); private final Map<Integer, ClientState<TaskId>> clients = new TreeMap<>(); private final Integer p1 = 1; private final Integer p2 = 2; @@ -448,9 +451,6 @@ public class StickyTaskAssignorTest { @Test public void shouldNotMoveAnyTasksWhenNewTasksAdded() throws Exception { - final TaskId task04 = new TaskId(0, 4); - final TaskId task05 = new TaskId(0, 5); - createClientWithPreviousActiveTasks(p1, 1, task00, task01); createClientWithPreviousActiveTasks(p2, 1, task02, task03); @@ -463,8 +463,6 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() throws Exception { - final TaskId task04 = new TaskId(0, 4); - final TaskId task05 = new TaskId(0, 5); createClientWithPreviousActiveTasks(p1, 1, task02, task01); createClientWithPreviousActiveTasks(p2, 1, task00, task03); @@ -478,9 +476,120 @@ public class StickyTaskAssignorTest { assertThat(clients.get(p3).activeTasks(), hasItems(task04, task05)); } + @Test + public void shouldAssignTasksNotPreviouslyActiveToNewClient() throws Exception { + final TaskId task10 = new TaskId(0, 10); + final TaskId task11 = new TaskId(0, 11); + final TaskId task12 = new TaskId(1, 2); + final TaskId task13 = new TaskId(1, 3); + final TaskId task20 = new TaskId(2, 0); + final TaskId task21 = new TaskId(2, 1); + final TaskId task22 = new TaskId(2, 2); + final TaskId task23 = new TaskId(2, 3); + + final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); + c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); + final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); + c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + final ClientState<TaskId> c3 = createClientWithPreviousActiveTasks(p3, 1, task20, task21, task23); + c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12)); + + final ClientState<TaskId> newClient = createClient(p4, 1); + newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23)); + + final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); + taskAssignor.assign(0); + + assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, task13))); + assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, task22))); + assertThat(c3.activeTasks(), equalTo(Utils.mkSet(task20, task21, task23))); + assertThat(newClient.activeTasks(), equalTo(Utils.mkSet(task02, task03, task10))); + } + + @Test + public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() throws Exception { + final TaskId task10 = new TaskId(0, 10); + final TaskId task11 = new TaskId(0, 11); + final TaskId task12 = new TaskId(1, 2); + final TaskId task13 = new TaskId(1, 3); + final TaskId task20 = new TaskId(2, 0); + final TaskId task21 = new TaskId(2, 1); + final TaskId task22 = new TaskId(2, 2); + final TaskId task23 = new TaskId(2, 3); + + final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); + c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); + final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); + c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + + final ClientState<TaskId> bounce1 = createClient(p3, 1); + bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23)); + + final ClientState<TaskId> bounce2 = createClient(p4, 1); + bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10)); + + final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); + taskAssignor.assign(0); + + assertThat(c1.activeTasks(), equalTo(Utils.mkSet(task01, task12, task13))); + assertThat(c2.activeTasks(), equalTo(Utils.mkSet(task00, task11, task22))); + assertThat(bounce1.activeTasks(), equalTo(Utils.mkSet(task20, task21, task23))); + assertThat(bounce2.activeTasks(), equalTo(Utils.mkSet(task02, task03, task10))); + } + + @Test + public void shouldAssignTasksToNewClient() throws Exception { + createClientWithPreviousActiveTasks(p1, 1, task01, task02); + createClient(p2, 1); + createTaskAssignor(task01, task02).assign(0); + assertThat(clients.get(p1).activeTaskCount(), equalTo(1)); + } + + @Test + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() throws Exception { + final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02); + final ClientState<TaskId> c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task04, task05); + final ClientState<TaskId> newClient = createClient(p3, 1); + + final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05); + taskAssignor.assign(0); + assertThat(c1.activeTasks(), not(hasItem(task03))); + assertThat(c1.activeTasks(), not(hasItem(task04))); + assertThat(c1.activeTasks(), not(hasItem(task05))); + assertThat(c1.activeTaskCount(), equalTo(2)); + assertThat(c2.activeTasks(), not(hasItems(task00))); + assertThat(c2.activeTasks(), not(hasItems(task01))); + assertThat(c2.activeTasks(), not(hasItems(task02))); + assertThat(c2.activeTaskCount(), equalTo(2)); + assertThat(newClient.activeTaskCount(), equalTo(2)); + } + + @Test + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() throws Exception { + final TaskId task06 = new TaskId(0, 6); + final ClientState<TaskId> c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06); + final ClientState<TaskId> c2 = createClient(p2, 1); + c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05)); + final ClientState<TaskId> newClient = createClient(p3, 1); + + final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05, task06); + taskAssignor.assign(0); + assertThat(c1.activeTasks(), not(hasItem(task03))); + assertThat(c1.activeTasks(), not(hasItem(task04))); + assertThat(c1.activeTasks(), not(hasItem(task05))); + assertThat(c1.activeTaskCount(), equalTo(3)); + assertThat(c2.activeTasks(), not(hasItems(task00))); + assertThat(c2.activeTasks(), not(hasItems(task01))); + assertThat(c2.activeTasks(), not(hasItems(task02))); + assertThat(c2.activeTaskCount(), equalTo(2)); + assertThat(newClient.activeTaskCount(), equalTo(2)); + } + private StickyTaskAssignor<Integer> createTaskAssignor(final TaskId... tasks) { + final List<TaskId> taskIds = Arrays.asList(tasks); + Collections.shuffle(taskIds); return new StickyTaskAssignor<>(clients, - new HashSet<>(Arrays.asList(tasks))); + new HashSet<>(taskIds)); } private List<TaskId> allActiveTasks() {
