[ 
https://issues.apache.org/jira/browse/KAFKA-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387310#comment-16387310
 ] 

ASF GitHub Bot commented on KAFKA-6309:
---------------------------------------

guozhangwang closed pull request #4624: KAFKA-6309: Improve task assignor load 
balance
URL: https://github.com/apache/kafka/pull/4624
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index de8fa57e36a..5b54d08c032 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -20,10 +20,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -106,14 +109,13 @@ private void assignActive() {
         }
 
         // assign any remaining unassigned tasks
-        for (final TaskId taskId : unassigned) {
+        List<TaskId> sortedTasks = new ArrayList<>(unassigned);
+        Collections.sort(sortedTasks);
+        for (final TaskId taskId : sortedTasks) {
             allocateTaskWithClientCandidates(taskId, clients.keySet(), true);
         }
-
     }
 
-
-
     private void allocateTaskWithClientCandidates(final TaskId taskId, final 
Set<ID> clientsWithin, final boolean active) {
         final ClientState client = findClient(taskId, clientsWithin, active);
         taskPairs.addPairs(taskId, client.assignedTasks());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 4f770c86239..ed22e3c30de 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -23,11 +23,13 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -350,6 +352,42 @@ public void 
shouldAssignMoreTasksToClientWithMoreCapacity() {
         assertThat(clients.get(p1).assignedTaskCount(), equalTo(4));
     }
 
+    @Test
+    public void shouldEvenlyDistributeByTaskIdAndPartition() {
+        createClient(p1, 4);
+        createClient(p2, 4);
+        createClient(p3, 4);
+        createClient(p4, 4);
+
+        final List<TaskId> taskIds = new ArrayList<>();
+        final TaskId[] taskIdArray = new TaskId[16];
+
+        for (int i = 1; i <= 2; i++) {
+            for (int j = 0; j < 8; j++) {
+                taskIds.add(new TaskId(i, j));
+            }
+        }
+
+        Collections.shuffle(taskIds);
+        taskIds.toArray(taskIdArray);
+
+        final StickyTaskAssignor<Integer> taskAssignor = 
createTaskAssignor(taskIdArray);
+        taskAssignor.assign(0);
+
+        Collections.sort(taskIds);
+        final Set<TaskId> expectedClientOneAssignment = 
getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12);
+        final Set<TaskId> expectedClientTwoAssignment = 
getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13);
+        final Set<TaskId> expectedClientThreeAssignment = 
getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14);
+        final Set<TaskId> expectedClientFourAssignment = 
getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15);
+
+        final Map<Integer, Set<TaskId>> sortedAssignments = 
sortClientAssignments(clients);
+
+        assertThat(sortedAssignments.get(p1), 
equalTo(expectedClientOneAssignment));
+        assertThat(sortedAssignments.get(p2), 
equalTo(expectedClientTwoAssignment));
+        assertThat(sortedAssignments.get(p3), 
equalTo(expectedClientThreeAssignment));
+        assertThat(sortedAssignments.get(p4), 
equalTo(expectedClientFourAssignment));
+    }
+
 
     @Test
     public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
@@ -665,4 +703,21 @@ private void 
assertActiveTaskTopicGroupIdsEvenlyDistributed() {
         }
     }
 
+    private Map<Integer, Set<TaskId>> sortClientAssignments(final Map<Integer, 
ClientState> clients) {
+        final Map<Integer, Set<TaskId>> sortedAssignments = new HashMap<>();
+        for (final Map.Entry<Integer, ClientState> entry : clients.entrySet()) 
{
+            final Set<TaskId> sorted = new 
TreeSet<>(entry.getValue().activeTasks());
+            sortedAssignments.put(entry.getKey(), sorted);
+        }
+        return sortedAssignments;
+    }
+
+    private Set<TaskId> getExpectedTaskIdAssignment(final List<TaskId> tasks, 
final int... indices) {
+        final Set<TaskId> sortedAssignment = new TreeSet<>();
+        for (final int index : indices) {
+            sortedAssignment.add(tasks.get(index));
+        }
+        return sortedAssignment;
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add support for getting topic defaults from AdminClient
> -------------------------------------------------------
>
>                 Key: KAFKA-6309
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6309
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: dan norwood
>            Assignee: dan norwood
>            Priority: Major
>
> kip here: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-234%3A+add+support+for+getting+topic+defaults+from+AdminClient



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to