mjsax commented on code in PR #14108: URL: https://github.com/apache/kafka/pull/14108#discussion_r1282562934
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java: ########## @@ -678,18 +838,230 @@ public void shouldThrowIfTaskMissingInClients() { "Task 1_0 not assigned to any client", exception.getMessage()); } - private Cluster getRandomCluster(final int nodeSize, final int tpSize) { + @Test + public void shouldNotCrashForEmptyStandby() { + final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( + getClusterForTopic0And1(), + getTaskTopicPartitionMapForAllTasks(), + mkMap(), + getTopologyGroupTaskMap(), + getProcessRacksForAllProcess(), + mockInternalTopicManagerForChangelog(), + new AssignorConfiguration(new StreamsConfig(configProps(1)).originals()).assignmentConfigs() + ); + + final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1); + final ClientState clientState2 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_2); + final ClientState clientState3 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_3); + + clientState1.assignActiveTasks(mkSet(TASK_0_1, TASK_1_1)); + clientState2.assignActive(TASK_1_0); + clientState3.assignActive(TASK_0_0); + + final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( + mkEntry(UUID_1, clientState1), + mkEntry(UUID_2, clientState2), + mkEntry(UUID_3, clientState3) + )); + + final long originalCost = assignor.standByTasksCost(new TreeSet<>(), clientStateMap, 10, 1); + assertEquals(0, originalCost); + + final long cost = assignor.optimizeStandbyTasks(clientStateMap, 10, 1, + (source, destination, task, clientStates) -> true); + assertEquals(0, cost); + } + + @Test + public void shouldOptimizeStandbyTasksWhenTasksAllMovable() { + final int replicaCount = 2; + final AssignmentConfigs assignorConfiguration = new AssignorConfiguration(new StreamsConfig(configProps(replicaCount)).originals()).assignmentConfigs(); + final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( + getClusterForTopic0And1(), + getTaskTopicPartitionMapForAllTasks(), + getTaskChangelogMapForAllTasks(), + getTopologyGroupTaskMap(), + getProcessRacksForAllProcess(), + mockInternalTopicManagerForChangelog(), + assignorConfiguration + ); + + final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1); + final ClientState clientState2 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_2); + final ClientState clientState3 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_3); + final ClientState clientState4 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_4); + final ClientState clientState5 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_6); + final ClientState clientState6 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_7); + + final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( + mkEntry(UUID_1, clientState1), + mkEntry(UUID_2, clientState2), + mkEntry(UUID_3, clientState3), + mkEntry(UUID_4, clientState4), + mkEntry(UUID_6, clientState5), + mkEntry(UUID_7, clientState6) + )); + + clientState1.assignActive(TASK_0_0); + clientState2.assignActive(TASK_0_1); + clientState3.assignActive(TASK_1_0); + clientState4.assignActive(TASK_1_1); + clientState5.assignActive(TASK_0_2); + clientState6.assignActive(TASK_1_2); + + clientState1.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_1)); // Cost 10 + clientState2.assignStandbyTasks(mkSet(TASK_0_0, TASK_1_0)); // Cost 10 + clientState3.assignStandbyTasks(mkSet(TASK_0_0, TASK_0_2)); // Cost 20 + clientState4.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_2)); // Cost 10 + clientState5.assignStandbyTasks(mkSet(TASK_1_0, TASK_1_2)); // Cost 10 + clientState6.assignStandbyTasks(mkSet(TASK_0_2, TASK_1_1)); // Cost 10 + + final SortedSet<TaskId> taskIds = new TreeSet<>(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2)); + final Map<UUID, Integer> standbyTaskCount = clientTaskCount(clientStateMap, ClientState::standbyTaskCount); + + assertTrue(assignor.canEnableRackAwareAssignor()); + verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), clientStateMap, replicaCount, false, null); + + final long originalCost = assignor.standByTasksCost(taskIds, clientStateMap, 10, 1); + assertEquals(60, originalCost); + + // Task can be moved anywhere so cost can be reduced to 30 compared to in shouldOptimizeStandbyTasksWithMovingConstraint it + // can only be reduced to 50 since there are moving constraints + final long cost = assignor.optimizeStandbyTasks(clientStateMap, 10, 1, + (source, destination, task, clients) -> true); + assertEquals(30, cost); + // Don't validate tasks in different racks after moving + verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), clientStateMap, replicaCount, true, standbyTaskCount); + } + + @Test + public void shouldOptimizeStandbyTasksWithMovingConstraint() { + final int replicaCount = 2; + final AssignmentConfigs assignorConfiguration = new AssignorConfiguration(new StreamsConfig(configProps(replicaCount)).originals()).assignmentConfigs(); + final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( + getClusterForTopic0And1(), + getTaskTopicPartitionMapForAllTasks(), + getTaskChangelogMapForAllTasks(), + getTopologyGroupTaskMap(), + getProcessRacksForAllProcess(), + mockInternalTopicManagerForChangelog(), + assignorConfiguration + ); + + final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1); + final ClientState clientState2 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_2); + final ClientState clientState3 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_3); + final ClientState clientState4 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_4); + final ClientState clientState5 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_6); + final ClientState clientState6 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_7); + + final SortedMap<UUID, ClientState> clientStateMap = new TreeMap<>(mkMap( + mkEntry(UUID_1, clientState1), + mkEntry(UUID_2, clientState2), + mkEntry(UUID_3, clientState3), + mkEntry(UUID_4, clientState4), + mkEntry(UUID_6, clientState5), + mkEntry(UUID_7, clientState6) + )); + + clientState1.assignActive(TASK_0_0); + clientState2.assignActive(TASK_0_1); + clientState3.assignActive(TASK_1_0); + clientState4.assignActive(TASK_1_1); + clientState5.assignActive(TASK_0_2); + clientState6.assignActive(TASK_1_2); + + clientState1.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_1)); // Cost 10 + clientState2.assignStandbyTasks(mkSet(TASK_0_0, TASK_1_0)); // Cost 10 + clientState3.assignStandbyTasks(mkSet(TASK_0_0, TASK_0_2)); // Cost 20 + clientState4.assignStandbyTasks(mkSet(TASK_0_1, TASK_1_2)); // Cost 10 + clientState5.assignStandbyTasks(mkSet(TASK_1_0, TASK_1_2)); // Cost 10 + clientState6.assignStandbyTasks(mkSet(TASK_0_2, TASK_1_1)); // Cost 10 + + final SortedSet<TaskId> taskIds = new TreeSet<>(mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2)); + final Map<UUID, Integer> standbyTaskCount = clientTaskCount(clientStateMap, ClientState::standbyTaskCount); + + assertTrue(assignor.canEnableRackAwareAssignor()); + verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), clientStateMap, replicaCount, false, null); + + final long originalCost = assignor.standByTasksCost(taskIds, clientStateMap, 10, 1); + assertEquals(60, originalCost); + + final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(assignorConfiguration, assignor); + assertInstanceOf(ClientTagAwareStandbyTaskAssignor.class, standbyTaskAssignor); + final long cost = assignor.optimizeStandbyTasks(clientStateMap, 10, 1, + standbyTaskAssignor::isAllowedTaskMovement); + assertEquals(50, cost); + // Validate tasks in different racks after moving + verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), clientStateMap, replicaCount, false, standbyTaskCount); Review Comment: Why do we pass `false`? We use the regular `standbyTaskAssignor::isAllowedTaskMovement` function so rack awareness should be met and thus we should do the strict check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org