This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 77a6fe9c2a5 KAFKA-15045: (KIP-924 pt. 22) Add 
RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)
77a6fe9c2a5 is described below

commit 77a6fe9c2a5d7d918536162c7db37b67c772d494
Author: Antoine Pourchet <[email protected]>
AuthorDate: Tue Jun 11 22:31:43 2024 -0600

    KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other 
minor TaskAssignmentUtils changes (#16294)
    
    We now provide a way to more easily customize the rack aware
    optimizations that we provide by way of a configuration class called
    RackAwareOptimizationParams.
    
    We also simplified the APIs for the optimizeXYZ utility functions since
    they were mutating the inputs anyway.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../processor/assignment/TaskAssignmentUtils.java  | 207 ++++++++++++++++-----
 .../assignment/assignors/StickyTaskAssignor.java   |  38 ++--
 .../assignment/TaskAssignmentUtilsTest.java        |   7 +-
 3 files changed, 185 insertions(+), 67 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
index f67e54b2e1c..39a698adfa5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -55,7 +56,103 @@ public final class TaskAssignmentUtils {
     private TaskAssignmentUtils() {}
 
     /**
-     * Return an {@code AssignmentError} for a task assignment created for an 
application.
+     * A simple config container for necessary parameters and optional 
overrides to apply when
+     * running the active or standby task rack-aware optimizations.
+     */
+    public static class RackAwareOptimizationParams {
+        private final ApplicationState applicationState;
+        private final Optional<Integer> trafficCostOverride;
+        private final Optional<Integer> nonOverlapCostOverride;
+        private final Optional<SortedSet<TaskId>> tasksToOptimize;
+
+        private RackAwareOptimizationParams(final ApplicationState 
applicationState,
+                                           final Optional<Integer> 
trafficCostOverride,
+                                           final Optional<Integer> 
nonOverlapCostOverride,
+                                           final Optional<SortedSet<TaskId>> 
tasksToOptimize) {
+            this.applicationState = applicationState;
+            this.trafficCostOverride = trafficCostOverride;
+            this.nonOverlapCostOverride = nonOverlapCostOverride;
+            this.tasksToOptimize = tasksToOptimize;
+        }
+
+        /**
+         * Return a new config object with no overrides and the 
tasksToOptimize initialized to the set of all tasks in the given 
ApplicationState
+         */
+        public static RackAwareOptimizationParams of(final ApplicationState 
applicationState) {
+            return new RackAwareOptimizationParams(applicationState, 
Optional.empty(), Optional.empty(), Optional.empty());
+        }
+
+        /**
+         * Return a new config object with the tasksToOptimize set to all 
stateful tasks in the given ApplicationState
+         */
+        public RackAwareOptimizationParams forStatefulTasks() {
+            final SortedSet<TaskId> tasks = 
applicationState.allTasks().values()
+                .stream()
+                .filter(TaskInfo::isStateful)
+                .map(TaskInfo::id)
+                .collect(Collectors.toCollection(TreeSet::new));
+            return forTasks(tasks);
+        }
+
+        /**
+         * Return a new config object with the tasksToOptimize set to all 
stateless tasks in the given ApplicationState
+         */
+        public RackAwareOptimizationParams forStatelessTasks() {
+            final SortedSet<TaskId> tasks = 
applicationState.allTasks().values()
+                .stream()
+                .filter(taskInfo -> !taskInfo.isStateful())
+                .map(TaskInfo::id)
+                .collect(Collectors.toCollection(TreeSet::new));
+            return forTasks(tasks);
+        }
+
+        /**
+         * Return a new config object with the provided tasksToOptimize
+         */
+        public RackAwareOptimizationParams forTasks(final SortedSet<TaskId> 
tasksToOptimize) {
+            return new RackAwareOptimizationParams(
+                applicationState,
+                trafficCostOverride,
+                nonOverlapCostOverride,
+                Optional.of(tasksToOptimize)
+            );
+        }
+
+        /**
+         * Return a new config object with the provided trafficCost override 
applied
+         */
+        public RackAwareOptimizationParams withTrafficCostOverride(final int 
trafficCostOverride) {
+            return new RackAwareOptimizationParams(
+                applicationState,
+                Optional.of(trafficCostOverride),
+                nonOverlapCostOverride,
+                tasksToOptimize
+            );
+        }
+
+        /**
+         * Return a new config object with the provided nonOverlapCost 
override applied
+         */
+        public RackAwareOptimizationParams withNonOverlapCostOverride(final 
int nonOverlapCostOverride) {
+            return new RackAwareOptimizationParams(
+                applicationState,
+                trafficCostOverride,
+                Optional.of(nonOverlapCostOverride),
+                tasksToOptimize
+            );
+        }
+    }
+
+    /**
+     * Validate the passed-in {@link TaskAssignment} and return an {@link 
AssignmentError} representing the
+     * first error detected in the assignment, or {@link AssignmentError#NONE} 
if the assignment passes the
+     * verification check.
+     * <p>
+     * Note: this verification is performed automatically by the 
StreamsPartitionAssignor on the assignment
+     * returned by the TaskAssignor, and the error returned to the assignor 
via the {@link TaskAssignor#onAssignmentComputed}
+     * callback. Therefore, it is not required to call this manually from the 
{@link TaskAssignor#assign} method.
+     * However, if an invalid assignment is returned it will fail the 
rebalance and kill the thread, so it may be useful to
+     * utilize this method in an assignor to verify the assignment before 
returning it and fix any errors it finds.
      *
      * @param applicationState The application for which this task assignment 
is being assessed.
      * @param taskAssignment   The task assignment that will be validated.
@@ -153,16 +250,14 @@ public final class TaskAssignmentUtils {
      * If rack-aware client tags are configured, the rack-aware standby task 
assignor will be used
      *
      * @param applicationState        the metadata and other info describing 
the current application state
-     * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
-     *
-     * @return a new map containing the mappings from KafkaStreamsAssignments 
updated with the default standby assignment
+     * @param kafkaStreamsAssignments the KafkaStreams client assignments to 
add standby tasks to
      */
-    public static Map<ProcessId, KafkaStreamsAssignment> 
defaultStandbyTaskAssignment(final ApplicationState applicationState,
-                                                                               
       final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+    public static void defaultStandbyTaskAssignment(final ApplicationState 
applicationState,
+                                                    final Map<ProcessId, 
KafkaStreamsAssignment> kafkaStreamsAssignments) {
         if 
(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
-            return tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
+            tagBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
         } else {
-            return loadBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
+            loadBasedStandbyTaskAssignment(applicationState, 
kafkaStreamsAssignments);
         }
     }
 
@@ -185,34 +280,43 @@ public final class TaskAssignmentUtils {
      * <p>
      * This method optimizes cross-rack traffic for active tasks only. For 
standby task optimization,
      * use {@link #optimizeRackAwareStandbyTasks}.
+     * <p>
+     * It is recommended to run this optimization before assigning any standby 
tasks, especially if you have configured
+     * your KafkaStreams clients with assignment tags via the 
rack.aware.assignment.tags config since this method may
+     * shuffle around active tasks without considering the client tags and can 
result in a violation of the original
+     * client tag assignment's constraints.
      *
-     * @param applicationState        the metadata and other info describing 
the current application state
+     * @param optimizationParams      optional configuration parameters to 
apply
      * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
-     * @param tasks                   the set of tasks to reassign if 
possible. Must already be assigned to a KafkaStreams client
-     *
-     * @return a map with the KafkaStreamsAssignments updated to minimize 
cross-rack traffic for active tasks
      */
-    public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareActiveTasks(final ApplicationState applicationState,
-                                                                               
       final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
-                                                                               
       final SortedSet<TaskId> tasks) {
-        if (tasks.isEmpty()) {
-            return kafkaStreamsAssignments;
+    public static void optimizeRackAwareActiveTasks(final 
RackAwareOptimizationParams optimizationParams,
+                                                    final Map<ProcessId, 
KafkaStreamsAssignment> kafkaStreamsAssignments) {
+        final ApplicationState applicationState = 
optimizationParams.applicationState;
+        final SortedSet<TaskId> activeTasksToOptimize = 
getTasksToOptimize(kafkaStreamsAssignments, optimizationParams, 
AssignedTask.Type.ACTIVE);
+        if (activeTasksToOptimize.isEmpty()) {
+            return;
         }
 
-        if (!canPerformRackAwareOptimization(applicationState, 
AssignedTask.Type.ACTIVE)) {
-            return kafkaStreamsAssignments;
+        if (!canPerformRackAwareOptimization(applicationState, 
optimizationParams, AssignedTask.Type.ACTIVE)) {
+            return;
         }
 
         initializeAssignmentsForAllClients(applicationState, 
kafkaStreamsAssignments);
 
-        final int crossRackTrafficCost = 
applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt();
-        final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt();
+        final int crossRackTrafficCost =
+            optimizationParams.trafficCostOverride.orElseGet(() -> 
applicationState.assignmentConfigs()
+                .rackAwareTrafficCost()
+                .getAsInt());
+        final int nonOverlapCost =
+            optimizationParams.nonOverlapCostOverride.orElseGet(() -> 
applicationState.assignmentConfigs()
+                .rackAwareNonOverlapCost()
+                .getAsInt());
 
         final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = 
applicationState.kafkaStreamsStates(false);
-        final List<TaskId> taskIds = new ArrayList<>(tasks);
+        final List<TaskId> taskIds = new ArrayList<>(activeTasksToOptimize);
 
         final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId = 
applicationState.allTasks().values().stream()
-            .filter(taskInfo -> tasks.contains(taskInfo.id()))
+            .filter(taskInfo -> activeTasksToOptimize.contains(taskInfo.id()))
             .collect(Collectors.toMap(TaskInfo::id, 
TaskInfo::topicPartitions));
 
         final List<ProcessId> clientIds = new 
ArrayList<>(kafkaStreamsStates.keySet());
@@ -259,8 +363,6 @@ public final class TaskAssignmentUtils {
             (assignment, taskId) -> assignment.removeTask(new 
AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
             (assignment, taskId) -> assignment.tasks().containsKey(taskId) && 
assignment.tasks().get(taskId).type() == AssignedTask.Type.ACTIVE
         );
-
-        return kafkaStreamsAssignments;
     }
 
     /**
@@ -283,31 +385,34 @@ public final class TaskAssignmentUtils {
      * This method optimizes cross-rack traffic for standby tasks only. For 
active task optimization,
      * use {@link #optimizeRackAwareActiveTasks}.
      *
+     * @param optimizationParams      optional configuration parameters to 
apply
      * @param kafkaStreamsAssignments the current assignment of tasks to 
KafkaStreams clients
-     * @param applicationState        the metadata and other info describing 
the current application state
-     *
-     * @return a map with the KafkaStreamsAssignments updated to minimize 
cross-rack traffic for standby tasks
      */
-    public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
-                                                                               
        final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
-        if (!canPerformRackAwareOptimization(applicationState, 
AssignedTask.Type.STANDBY)) {
-            return kafkaStreamsAssignments;
+    public static void optimizeRackAwareStandbyTasks(final 
RackAwareOptimizationParams optimizationParams,
+                                                     final Map<ProcessId, 
KafkaStreamsAssignment> kafkaStreamsAssignments) {
+        final ApplicationState applicationState = 
optimizationParams.applicationState;
+        final SortedSet<TaskId> standbyTasksToOptimize = 
getTasksToOptimize(kafkaStreamsAssignments, optimizationParams, 
AssignedTask.Type.STANDBY);
+        if (standbyTasksToOptimize.isEmpty()) {
+            return;
+        }
+
+        if (!canPerformRackAwareOptimization(applicationState, 
optimizationParams, AssignedTask.Type.STANDBY)) {
+            return;
         }
 
         initializeAssignmentsForAllClients(applicationState, 
kafkaStreamsAssignments);
 
-        final int crossRackTrafficCost = 
applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt();
-        final int nonOverlapCost = 
applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt();
+        final int crossRackTrafficCost =
+            optimizationParams.trafficCostOverride.orElseGet(() -> 
applicationState.assignmentConfigs()
+                .rackAwareTrafficCost()
+                .getAsInt());
+        final int nonOverlapCost =
+            optimizationParams.nonOverlapCostOverride.orElseGet(() -> 
applicationState.assignmentConfigs()
+                .rackAwareNonOverlapCost()
+                .getAsInt());
 
         final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = 
applicationState.kafkaStreamsStates(false);
 
-        final List<TaskId> standbyTasksToOptimize = 
kafkaStreamsAssignments.values().stream()
-            .flatMap(r -> r.tasks().values().stream())
-            .filter(task -> task.type() == AssignedTask.Type.STANDBY)
-            .map(AssignedTask::id)
-            .distinct()
-            .collect(Collectors.toList());
-
         final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId =
             
applicationState.allTasks().values().stream().collect(Collectors.toMap(
                 TaskInfo::id,
@@ -317,7 +422,7 @@ public final class TaskAssignmentUtils {
         final List<ProcessId> clientIds = new 
ArrayList<>(kafkaStreamsStates.keySet());
         final long initialCost = computeTotalAssignmentCost(
             topicPartitionsByTaskId,
-            standbyTasksToOptimize,
+            new ArrayList<>(standbyTasksToOptimize),
             clientIds,
             kafkaStreamsAssignments,
             kafkaStreamsStates,
@@ -411,7 +516,7 @@ public final class TaskAssignmentUtils {
         }
         final long finalCost = computeTotalAssignmentCost(
             topicPartitionsByTaskId,
-            standbyTasksToOptimize,
+            new ArrayList<>(standbyTasksToOptimize),
             clientIds,
             kafkaStreamsAssignments,
             kafkaStreamsStates,
@@ -424,7 +529,6 @@ public final class TaskAssignmentUtils {
         final long duration = System.currentTimeMillis() - startTime;
         LOG.info("Assignment after {} rounds and {} milliseconds for standby 
task optimization is {}\n with cost {}",
             round, duration, kafkaStreamsAssignments, finalCost);
-        return kafkaStreamsAssignments;
     }
 
     private static long computeTotalAssignmentCost(final Map<TaskId, 
Set<TaskTopicPartition>> topicPartitionsByTaskId,
@@ -541,6 +645,7 @@ public final class TaskAssignmentUtils {
      *         is set.
      */
     private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
+                                                           final 
RackAwareOptimizationParams optimizationParams,
                                                            final 
AssignedTask.Type taskType) {
         final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
         final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
@@ -902,6 +1007,20 @@ public final class TaskAssignmentUtils {
         }
     }
 
+    private static SortedSet<TaskId> getTasksToOptimize(final Map<ProcessId, 
KafkaStreamsAssignment> assignments,
+                                                        final 
RackAwareOptimizationParams optimizationParams,
+                                                        final 
AssignedTask.Type taskType) {
+        if (optimizationParams != null && 
optimizationParams.tasksToOptimize.isPresent()) {
+            return optimizationParams.tasksToOptimize.get();
+        }
+
+        return assignments.values().stream()
+            .flatMap(r -> r.tasks().values().stream())
+            .filter(task -> task.type() == taskType)
+            .map(AssignedTask::id)
+            .collect(Collectors.toCollection(TreeSet::new));
+    }
+
     private static class TagStatistics {
         private final Map<String, Set<String>> tagKeyToValues;
         private final Map<KeyValue<String, String>, Set<ProcessId>> 
tagEntryToClients;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
index 827c9138c99..3d5e5b247f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.kafka.streams.processor.TaskId;
@@ -39,9 +38,11 @@ import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.Assi
 import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import 
org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams;
 import org.apache.kafka.streams.processor.assignment.TaskAssignor;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
 import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
+import 
org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,21 +95,19 @@ public class StickyTaskAssignor implements TaskAssignor {
 
         final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = 
assignmentState.newAssignments;
 
-        final Set<TaskId> statefulTasks = 
applicationState.allTasks().values().stream()
-            .filter(TaskInfo::isStateful)
-            .map(TaskInfo::id)
-            .collect(Collectors.toSet());
-        final Map<ProcessId, KafkaStreamsAssignment> 
optimizedAssignmentsForStatefulTasks = 
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
-            applicationState, currentAssignments, new 
TreeSet<>(statefulTasks));
-
-        final Set<TaskId> statelessTasks = 
applicationState.allTasks().values().stream()
-            .filter(task -> !task.isStateful())
-            .map(TaskInfo::id)
-            .collect(Collectors.toSet());
-        final Map<ProcessId, KafkaStreamsAssignment> 
optimizedAssignmentsForAllTasks = 
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
-            applicationState, optimizedAssignmentsForStatefulTasks, new 
TreeSet<>(statelessTasks));
-
-        
assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks);
+        TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+            
RackAwareOptimizationParams.of(applicationState).forStatefulTasks(),
+            currentAssignments
+        );
+
+        TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+            RackAwareOptimizationParams.of(applicationState)
+                .forStatelessTasks()
+                
.withTrafficCostOverride(RackAwareTaskAssignor.STATELESS_TRAFFIC_COST)
+                
.withNonOverlapCostOverride(RackAwareTaskAssignor.STATELESS_NON_OVERLAP_COST),
+            currentAssignments
+        );
+        assignmentState.processOptimizedAssignments(currentAssignments);
     }
 
     private void optimizeStandby(final ApplicationState applicationState, 
final AssignmentState assignmentState) {
@@ -120,10 +119,9 @@ public class StickyTaskAssignor implements TaskAssignor {
             return;
         }
 
-        final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = 
assignmentState.newAssignments;
-        final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = 
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(
-            applicationState, currentAssignments);
-        assignmentState.processOptimizedAssignments(optimizedAssignments);
+        final Map<ProcessId, KafkaStreamsAssignment> assignments = 
assignmentState.newAssignments;
+        
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
 assignments);
+        assignmentState.processOptimizedAssignments(assignments);
     }
 
     private static void assignActive(final ApplicationState applicationState,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
index 7bcf386ab2a..8eff5812284 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
@@ -50,6 +50,7 @@ import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.Assi
 import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import 
org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
 import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
 import org.junit.Rule;
@@ -88,14 +89,14 @@ public class TaskAssignmentUtilsTest {
         );
 
         TaskAssignmentUtils.optimizeRackAwareActiveTasks(
-            applicationState, assignments, new TreeSet<>(tasks.keySet()));
+            RackAwareOptimizationParams.of(applicationState), assignments);
         assertThat(assignments.size(), equalTo(2));
         assertThat(assignments.get(processId(1)).tasks().keySet(), 
equalTo(mkSet(TASK_0_1)));
         assertThat(assignments.get(processId(2)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0)));
 
         // Repeated to make sure nothing gets shifted around after the first 
round of optimization.
         TaskAssignmentUtils.optimizeRackAwareActiveTasks(
-            applicationState, assignments, new TreeSet<>(tasks.keySet()));
+            RackAwareOptimizationParams.of(applicationState), assignments);
         assertThat(assignments.size(), equalTo(2));
         assertThat(assignments.get(processId(1)).tasks().keySet(), 
equalTo(mkSet(TASK_0_1)));
         assertThat(assignments.get(processId(2)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0)));
@@ -127,7 +128,7 @@ public class TaskAssignmentUtilsTest {
             mkAssignment(AssignedTask.Type.STANDBY, 3, TASK_0_0)
         );
 
-        TaskAssignmentUtils.optimizeRackAwareStandbyTasks(applicationState, 
assignments);
+        
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
 assignments);
         assertThat(assignments.size(), equalTo(3));
         assertThat(assignments.get(processId(1)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0, TASK_0_1)));
         assertThat(assignments.get(processId(2)).tasks().keySet(), 
equalTo(mkSet(TASK_0_0)));

Reply via email to