mjsax commented on code in PR #14108:
URL: https://github.com/apache/kafka/pull/14108#discussion_r1282562934


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -678,18 +838,230 @@ public void shouldThrowIfTaskMissingInClients() {
             "Task 1_0 not assigned to any client", exception.getMessage());
     }
 
-    private Cluster getRandomCluster(final int nodeSize, final int tpSize) {
+    @Test
+    public void shouldNotCrashForEmptyStandby() {
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            getClusterForTopic0And1(),
+            getTaskTopicPartitionMapForAllTasks(),
+            mkMap(),
+            getTopologyGroupTaskMap(),
+            getProcessRacksForAllProcess(),
+            mockInternalTopicManagerForChangelog(),
+            new AssignorConfiguration(new 
StreamsConfig(configProps(1)).originals()).assignmentConfigs()
+        );
+
+        final ClientState clientState1 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1);
+        final ClientState clientState2 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_2);
+        final ClientState clientState3 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_3);
+
+        clientState1.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1));
+        clientState2.assignActive(TASK_1_0);
+        clientState3.assignActive(TASK_0_0);
+
+        final SortedMap<UUID, ClientState> clientStateMap = new 
TreeMap<>(mkMap(
+            mkEntry(UUID_1, clientState1),
+            mkEntry(UUID_2, clientState2),
+            mkEntry(UUID_3, clientState3)
+        ));
+
+        final long originalCost = assignor.standByTasksCost(new TreeSet<>(), 
clientStateMap, 10, 1);
+        assertEquals(0, originalCost);
+
+        final long cost = assignor.optimizeStandbyTasks(clientStateMap, 10, 1,
+            (source, destination, task, clientStates) -> true);
+        assertEquals(0, cost);
+    }
+
+    @Test
+    public void shouldOptimizeStandbyTasksWhenTasksAllMovable() {
+        final int replicaCount = 2;
+        final AssignmentConfigs assignorConfiguration = new 
AssignorConfiguration(new 
StreamsConfig(configProps(replicaCount)).originals()).assignmentConfigs();
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            getClusterForTopic0And1(),
+            getTaskTopicPartitionMapForAllTasks(),
+            getTaskChangelogMapForAllTasks(),
+            getTopologyGroupTaskMap(),
+            getProcessRacksForAllProcess(),
+            mockInternalTopicManagerForChangelog(),
+            assignorConfiguration
+        );
+
+        final ClientState clientState1 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1);
+        final ClientState clientState2 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_2);
+        final ClientState clientState3 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_3);
+        final ClientState clientState4 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_4);
+        final ClientState clientState5 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_6);
+        final ClientState clientState6 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_7);
+
+        final SortedMap<UUID, ClientState> clientStateMap = new 
TreeMap<>(mkMap(
+            mkEntry(UUID_1, clientState1),
+            mkEntry(UUID_2, clientState2),
+            mkEntry(UUID_3, clientState3),
+            mkEntry(UUID_4, clientState4),
+            mkEntry(UUID_6, clientState5),
+            mkEntry(UUID_7, clientState6)
+        ));
+
+        clientState1.assignActive(TASK_0_0);
+        clientState2.assignActive(TASK_0_1);
+        clientState3.assignActive(TASK_1_0);
+        clientState4.assignActive(TASK_1_1);
+        clientState5.assignActive(TASK_0_2);
+        clientState6.assignActive(TASK_1_2);
+
+        clientState1.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_1)); // Cost 10
+        clientState2.assignStandbyTasks(mkSet(TASK_0_0, TASK_1_0)); // Cost 10
+        clientState3.assignStandbyTasks(mkSet(TASK_0_0, TASK_0_2)); // Cost 20
+        clientState4.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_2)); // Cost 10
+        clientState5.assignStandbyTasks(mkSet(TASK_1_0, TASK_1_2)); // Cost 10
+        clientState6.assignStandbyTasks(mkSet(TASK_0_2, TASK_1_1)); // Cost 10
+
+        final SortedSet<TaskId> taskIds = new TreeSet<>(mkSet(TASK_0_0, 
TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2));
+        final Map<UUID, Integer> standbyTaskCount = 
clientTaskCount(clientStateMap, ClientState::standbyTaskCount);
+
+        assertTrue(assignor.canEnableRackAwareAssignor());
+        verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), 
clientStateMap, replicaCount, false, null);
+
+        final long originalCost = assignor.standByTasksCost(taskIds, 
clientStateMap, 10, 1);
+        assertEquals(60, originalCost);
+
+        // Task can be moved anywhere so cost can be reduced to 30 compared to 
in shouldOptimizeStandbyTasksWithMovingConstraint it
+        // can only be reduced to 50 since there are moving constraints
+        final long cost = assignor.optimizeStandbyTasks(clientStateMap, 10, 1,
+            (source, destination, task, clients) -> true);
+        assertEquals(30, cost);
+        // Don't validate tasks in different racks after moving
+        verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), 
clientStateMap, replicaCount, true, standbyTaskCount);
+    }
+
+    @Test
+    public void shouldOptimizeStandbyTasksWithMovingConstraint() {
+        final int replicaCount = 2;
+        final AssignmentConfigs assignorConfiguration = new 
AssignorConfiguration(new 
StreamsConfig(configProps(replicaCount)).originals()).assignmentConfigs();
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            getClusterForTopic0And1(),
+            getTaskTopicPartitionMapForAllTasks(),
+            getTaskChangelogMapForAllTasks(),
+            getTopologyGroupTaskMap(),
+            getProcessRacksForAllProcess(),
+            mockInternalTopicManagerForChangelog(),
+            assignorConfiguration
+        );
+
+        final ClientState clientState1 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1);
+        final ClientState clientState2 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_2);
+        final ClientState clientState3 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_3);
+        final ClientState clientState4 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_4);
+        final ClientState clientState5 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_6);
+        final ClientState clientState6 = new ClientState(emptySet(), 
emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_7);
+
+        final SortedMap<UUID, ClientState> clientStateMap = new 
TreeMap<>(mkMap(
+            mkEntry(UUID_1, clientState1),
+            mkEntry(UUID_2, clientState2),
+            mkEntry(UUID_3, clientState3),
+            mkEntry(UUID_4, clientState4),
+            mkEntry(UUID_6, clientState5),
+            mkEntry(UUID_7, clientState6)
+        ));
+
+        clientState1.assignActive(TASK_0_0);
+        clientState2.assignActive(TASK_0_1);
+        clientState3.assignActive(TASK_1_0);
+        clientState4.assignActive(TASK_1_1);
+        clientState5.assignActive(TASK_0_2);
+        clientState6.assignActive(TASK_1_2);
+
+        clientState1.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_1)); // Cost 10
+        clientState2.assignStandbyTasks(mkSet(TASK_0_0, TASK_1_0)); // Cost 10
+        clientState3.assignStandbyTasks(mkSet(TASK_0_0, TASK_0_2)); // Cost 20
+        clientState4.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_2)); // Cost 10
+        clientState5.assignStandbyTasks(mkSet(TASK_1_0, TASK_1_2)); // Cost 10
+        clientState6.assignStandbyTasks(mkSet(TASK_0_2, TASK_1_1)); // Cost 10
+
+        final SortedSet<TaskId> taskIds = new TreeSet<>(mkSet(TASK_0_0, 
TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2));
+        final Map<UUID, Integer> standbyTaskCount = 
clientTaskCount(clientStateMap, ClientState::standbyTaskCount);
+
+        assertTrue(assignor.canEnableRackAwareAssignor());
+        verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), 
clientStateMap, replicaCount, false, null);
+
+        final long originalCost = assignor.standByTasksCost(taskIds, 
clientStateMap, 10, 1);
+        assertEquals(60, originalCost);
+
+        final StandbyTaskAssignor standbyTaskAssignor = 
StandbyTaskAssignorFactory.create(assignorConfiguration, assignor);
+        assertInstanceOf(ClientTagAwareStandbyTaskAssignor.class, 
standbyTaskAssignor);
+        final long cost = assignor.optimizeStandbyTasks(clientStateMap, 10, 1,
+            standbyTaskAssignor::isAllowedTaskMovement);
+        assertEquals(50, cost);
+        // Validate tasks in different racks after moving
+        verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), 
clientStateMap, replicaCount, false, standbyTaskCount);

Review Comment:
   Why do we pass `false` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to