mjsax commented on code in PR #14139:
URL: https://github.com/apache/kafka/pull/14139#discussion_r1286401088


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -145,6 +163,14 @@ private void assignStandbyReplicaTasks(final TreeMap<UUID, 
ClientState> clientSt
             ClientState::assignStandby,
             standbyTaskAssignor::isAllowedTaskMovement
         );
+
+        if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() 
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {

Review Comment:
   As above.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -208,19 +234,27 @@ private static boolean shouldMoveATask(final ClientState 
sourceClientState,
     }
 
     private static void assignStatelessActiveTasks(final TreeMap<UUID, 
ClientState> clientStates,
-                                                   final Iterable<TaskId> 
statelessTasks) {
+                                                   final Iterable<TaskId> 
statelessTasks,
+                                                   final 
Optional<RackAwareTaskAssignor> rackAwareTaskAssignor) {
         final ConstrainedPrioritySet statelessActiveTaskClientsByTaskLoad = 
new ConstrainedPrioritySet(
             (client, task) -> true,
             client -> clientStates.get(client).activeTaskLoad()
         );
         statelessActiveTaskClientsByTaskLoad.offerAll(clientStates.keySet());
 
+        final SortedSet<TaskId> sortedTasks = new TreeSet<>();
         for (final TaskId task : statelessTasks) {
+            sortedTasks.add(task);
             final UUID client = 
statelessActiveTaskClientsByTaskLoad.poll(task);
             final ClientState state = clientStates.get(client);
             state.assignActive(task);
             statelessActiveTaskClientsByTaskLoad.offer(client);
         }
+
+        if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() 
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {

Review Comment:
   As above.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java:
##########
@@ -56,52 +67,118 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasActiveTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasAssignedTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasStandbyTasks;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.fail;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(Parameterized.class)
 public class HighAvailabilityTaskAssignorTest {
-    private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
-        /*acceptableRecoveryLag*/ 100L,
-        /*maxWarmupReplicas*/ 2,
-        /*numStandbyReplicas*/ 0,
-        /*probingRebalanceIntervalMs*/ 60 * 1000L,
-        /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-    );
-
-    private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
-        /*acceptableRecoveryLag*/ 100L,
-        /*maxWarmupReplicas*/ 2,
-        /*numStandbyReplicas*/ 1,
-        /*probingRebalanceIntervalMs*/ 60 * 1000L,
-        /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
-    );
+    private AssignmentConfigs getConfigWithoutStandbys() {
+        return new AssignmentConfigs(
+            /*acceptableRecoveryLag*/ 100L,
+            /*maxWarmupReplicas*/ 2,
+            /*numStandbyReplicas*/ 0,
+            /*probingRebalanceIntervalMs*/ 60 * 1000L,
+            /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+            null,
+            null,
+            rackAwareStrategy
+        );
+    }
+
+    private AssignmentConfigs getConfigWithStandbys() {
+        return getConfigWithStandbys(1);
+    }
+
+    private AssignmentConfigs getConfigWithStandbys(final int replicaNum) {
+        return new AssignmentConfigs(
+            /*acceptableRecoveryLag*/ 100L,
+            /*maxWarmupReplicas*/ 2,
+            /*numStandbyReplicas*/ replicaNum,
+            /*probingRebalanceIntervalMs*/ 60 * 1000L,
+            /*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+            null,
+            null,
+            rackAwareStrategy
+        );
+    }
+
+    @Parameter
+    public boolean enableRackAwareTaskAssignor;
+
+    private String rackAwareStrategy = 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
+
+    @Before
+    public void setUp() {
+        if (enableRackAwareTaskAssignor) {
+            rackAwareStrategy = 
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC;
+        }
+    }
+
+    @Parameterized.Parameters(name = "enableRackAwareTaskAssignor={0}")
+    public static Collection<Object[]> getParamStoreType() {
+        return asList(new Object[][] {
+            {true},
+            {false}
+        });
+    }
 
     @Test
     public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
         final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, 
TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
-        final ClientState clientState1 = new ClientState(allTaskIds, 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 
EMPTY_CLIENT_TAGS, 1);
-        final ClientState clientState2 = new ClientState(emptySet(), 
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 
EMPTY_CLIENT_TAGS, 1);
-        final ClientState clientState3 = new ClientState(emptySet(), 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 
Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1);
+        final ClientState clientState1 = new ClientState(allTaskIds, 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 
EMPTY_CLIENT_TAGS, 1, UUID_1);
+        final ClientState clientState2 = new ClientState(emptySet(), 
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 
EMPTY_CLIENT_TAGS, 1, UUID_2);
+        final ClientState clientState3 = new ClientState(emptySet(), 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 
Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1, UUID_3);
 
         final Map<UUID, ClientState> clientStates = mkMap(
             mkEntry(UUID_1, clientState1),
             mkEntry(UUID_2, clientState2),
             mkEntry(UUID_3, clientState3)
         );
 
+        final AssignmentConfigs configs = new AssignmentConfigs(
+            11L,
+            2,
+            1,
+            60_000L,
+            EMPTY_RACK_AWARE_ASSIGNMENT_TAGS,
+            null,
+            null,
+            rackAwareStrategy
+        );
+        final RackAwareTaskAssignor rackAwareTaskAssignor = 
getRackAwareTaskAssignor(configs);
+
         final boolean unstable = new HighAvailabilityTaskAssignor().assign(
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(11L, 2, 1, 60_000L, 
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
+            Optional.of(rackAwareTaskAssignor),

Review Comment:
   Aligns (and triggered) my comments above -- if we can always pass a rack 
aware assignor as it's enabled/disabled by the `config` anyway, we should not 
make it optional and require that it's not-null (and otherwise crash as it 
would indicate a bug).



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##########
@@ -124,11 +133,20 @@ private static void assignActiveStatefulTasks(final 
SortedMap<UUID, ClientState>
             ClientState::assignActive,
             (source, destination) -> true
         );
+
+        if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.isPresent() 
&& rackAwareTaskAssignor.get().canEnableRackAwareAssignor()) {

Review Comment:
   Just wondering why we would ever pass in `null`? The rack aware assignor is 
an internal class, so we can always construct it, and in the end we make a 
decision about using it or not via `canEnableRackAwareAssignor()` that does the 
the corresponding config anyway? -- I seems simpler to alway pass an instance 
of the rack aware assignor and simplify the code here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to