[ https://issues.apache.org/jira/browse/KAFKA-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387310#comment-16387310 ]
ASF GitHub Bot commented on KAFKA-6309: --------------------------------------- guozhangwang closed pull request #4624: KAFKA-6309: Improve task assignor load balance URL: https://github.com/apache/kafka/pull/4624 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index de8fa57e36a..5b54d08c032 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -20,10 +20,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -106,14 +109,13 @@ private void assignActive() { } // assign any remaining unassigned tasks - for (final TaskId taskId : unassigned) { + List<TaskId> sortedTasks = new ArrayList<>(unassigned); + Collections.sort(sortedTasks); + for (final TaskId taskId : sortedTasks) { allocateTaskWithClientCandidates(taskId, clients.keySet(), true); } - } - - private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) { final ClientState client = findClient(taskId, clientsWithin, active); taskPairs.addPairs(taskId, client.assignedTasks()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 4f770c86239..ed22e3c30de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -23,11 +23,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -350,6 +352,42 @@ public void shouldAssignMoreTasksToClientWithMoreCapacity() { assertThat(clients.get(p1).assignedTaskCount(), equalTo(4)); } + @Test + public void shouldEvenlyDistributeByTaskIdAndPartition() { + createClient(p1, 4); + createClient(p2, 4); + createClient(p3, 4); + createClient(p4, 4); + + final List<TaskId> taskIds = new ArrayList<>(); + final TaskId[] taskIdArray = new TaskId[16]; + + for (int i = 1; i <= 2; i++) { + for (int j = 0; j < 8; j++) { + taskIds.add(new TaskId(i, j)); + } + } + + Collections.shuffle(taskIds); + taskIds.toArray(taskIdArray); + + final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(taskIdArray); + taskAssignor.assign(0); + + Collections.sort(taskIds); + final Set<TaskId> expectedClientOneAssignment = getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12); + final Set<TaskId> expectedClientTwoAssignment = getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13); + final Set<TaskId> expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14); + final Set<TaskId> expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15); + + final Map<Integer, Set<TaskId>> sortedAssignments = sortClientAssignments(clients); + + assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment)); + assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment)); + assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment)); + assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment)); + } + @Test public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { @@ -665,4 +703,21 @@ private void assertActiveTaskTopicGroupIdsEvenlyDistributed() { } } + private Map<Integer, Set<TaskId>> sortClientAssignments(final Map<Integer, ClientState> clients) { + final Map<Integer, Set<TaskId>> sortedAssignments = new HashMap<>(); + for (final Map.Entry<Integer, ClientState> entry : clients.entrySet()) { + final Set<TaskId> sorted = new TreeSet<>(entry.getValue().activeTasks()); + sortedAssignments.put(entry.getKey(), sorted); + } + return sortedAssignments; + } + + private Set<TaskId> getExpectedTaskIdAssignment(final List<TaskId> tasks, final int... indices) { + final Set<TaskId> sortedAssignment = new TreeSet<>(); + for (final int index : indices) { + sortedAssignment.add(tasks.get(index)); + } + return sortedAssignment; + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > add support for getting topic defaults from AdminClient > ------------------------------------------------------- > > Key: KAFKA-6309 > URL: https://issues.apache.org/jira/browse/KAFKA-6309 > Project: Kafka > Issue Type: Improvement > Reporter: dan norwood > Assignee: dan norwood > Priority: Major > > kip here: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-234%3A+add+support+for+getting+topic+defaults+from+AdminClient -- This message was sent by Atlassian JIRA (v7.6.3#76005)