This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0782232fbeb KAFKA-15045: (KIP-924 pt. 22) Add
RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)
0782232fbeb is described below
commit 0782232fbeb6313a316b930d12508d1d6148f3c9
Author: Antoine Pourchet <[email protected]>
AuthorDate: Tue Jun 11 22:31:43 2024 -0600
KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other
minor TaskAssignmentUtils changes (#16294)
We now provide a way to more easily customize the rack aware
optimizations that we provide by way of a configuration class called
RackAwareOptimizationParams.
We also simplified the APIs for the optimizeXYZ utility functions since
they were mutating the inputs anyway.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../processor/assignment/TaskAssignmentUtils.java | 207 ++++++++++++++++-----
.../assignment/assignors/StickyTaskAssignor.java | 38 ++--
.../assignment/TaskAssignmentUtilsTest.java | 7 +-
3 files changed, 185 insertions(+), 67 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
index f67e54b2e1c..39a698adfa5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -55,7 +56,103 @@ public final class TaskAssignmentUtils {
private TaskAssignmentUtils() {}
/**
- * Return an {@code AssignmentError} for a task assignment created for an
application.
+ * A simple config container for necessary parameters and optional
overrides to apply when
+ * running the active or standby task rack-aware optimizations.
+ */
+ public static class RackAwareOptimizationParams {
+ private final ApplicationState applicationState;
+ private final Optional<Integer> trafficCostOverride;
+ private final Optional<Integer> nonOverlapCostOverride;
+ private final Optional<SortedSet<TaskId>> tasksToOptimize;
+
+ private RackAwareOptimizationParams(final ApplicationState
applicationState,
+ final Optional<Integer>
trafficCostOverride,
+ final Optional<Integer>
nonOverlapCostOverride,
+ final Optional<SortedSet<TaskId>>
tasksToOptimize) {
+ this.applicationState = applicationState;
+ this.trafficCostOverride = trafficCostOverride;
+ this.nonOverlapCostOverride = nonOverlapCostOverride;
+ this.tasksToOptimize = tasksToOptimize;
+ }
+
+ /**
+ * Return a new config object with no overrides and the
tasksToOptimize initialized to the set of all tasks in the given
ApplicationState
+ */
+ public static RackAwareOptimizationParams of(final ApplicationState
applicationState) {
+ return new RackAwareOptimizationParams(applicationState,
Optional.empty(), Optional.empty(), Optional.empty());
+ }
+
+ /**
+ * Return a new config object with the tasksToOptimize set to all
stateful tasks in the given ApplicationState
+ */
+ public RackAwareOptimizationParams forStatefulTasks() {
+ final SortedSet<TaskId> tasks =
applicationState.allTasks().values()
+ .stream()
+ .filter(TaskInfo::isStateful)
+ .map(TaskInfo::id)
+ .collect(Collectors.toCollection(TreeSet::new));
+ return forTasks(tasks);
+ }
+
+ /**
+ * Return a new config object with the tasksToOptimize set to all
stateless tasks in the given ApplicationState
+ */
+ public RackAwareOptimizationParams forStatelessTasks() {
+ final SortedSet<TaskId> tasks =
applicationState.allTasks().values()
+ .stream()
+ .filter(taskInfo -> !taskInfo.isStateful())
+ .map(TaskInfo::id)
+ .collect(Collectors.toCollection(TreeSet::new));
+ return forTasks(tasks);
+ }
+
+ /**
+ * Return a new config object with the provided tasksToOptimize
+ */
+ public RackAwareOptimizationParams forTasks(final SortedSet<TaskId>
tasksToOptimize) {
+ return new RackAwareOptimizationParams(
+ applicationState,
+ trafficCostOverride,
+ nonOverlapCostOverride,
+ Optional.of(tasksToOptimize)
+ );
+ }
+
+ /**
+ * Return a new config object with the provided trafficCost override
applied
+ */
+ public RackAwareOptimizationParams withTrafficCostOverride(final int
trafficCostOverride) {
+ return new RackAwareOptimizationParams(
+ applicationState,
+ Optional.of(trafficCostOverride),
+ nonOverlapCostOverride,
+ tasksToOptimize
+ );
+ }
+
+ /**
+ * Return a new config object with the provided nonOverlapCost
override applied
+ */
+ public RackAwareOptimizationParams withNonOverlapCostOverride(final
int nonOverlapCostOverride) {
+ return new RackAwareOptimizationParams(
+ applicationState,
+ trafficCostOverride,
+ Optional.of(nonOverlapCostOverride),
+ tasksToOptimize
+ );
+ }
+ }
+
+ /**
+ * Validate the passed-in {@link TaskAssignment} and return an {@link
AssignmentError} representing the
+ * first error detected in the assignment, or {@link AssignmentError#NONE}
if the assignment passes the
+ * verification check.
+ * <p>
+ * Note: this verification is performed automatically by the
StreamsPartitionAssignor on the assignment
+ * returned by the TaskAssignor, and the error returned to the assignor
via the {@link TaskAssignor#onAssignmentComputed}
+ * callback. Therefore, it is not required to call this manually from the
{@link TaskAssignor#assign} method.
+ * However, if an invalid assignment is returned it will fail the
rebalance and kill the thread, so it may be useful to
+ * utilize this method in an assignor to verify the assignment before
returning it and fix any errors it finds.
*
* @param applicationState The application for which this task assignment
is being assessed.
* @param taskAssignment The task assignment that will be validated.
@@ -153,16 +250,14 @@ public final class TaskAssignmentUtils {
* If rack-aware client tags are configured, the rack-aware standby task
assignor will be used
*
* @param applicationState the metadata and other info describing
the current application state
- * @param kafkaStreamsAssignments the current assignment of tasks to
KafkaStreams clients
- *
- * @return a new map containing the mappings from KafkaStreamsAssignments
updated with the default standby assignment
+ * @param kafkaStreamsAssignments the KafkaStreams client assignments to
add standby tasks to
*/
- public static Map<ProcessId, KafkaStreamsAssignment>
defaultStandbyTaskAssignment(final ApplicationState applicationState,
-
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
+ public static void defaultStandbyTaskAssignment(final ApplicationState
applicationState,
+ final Map<ProcessId,
KafkaStreamsAssignment> kafkaStreamsAssignments) {
if
(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
- return tagBasedStandbyTaskAssignment(applicationState,
kafkaStreamsAssignments);
+ tagBasedStandbyTaskAssignment(applicationState,
kafkaStreamsAssignments);
} else {
- return loadBasedStandbyTaskAssignment(applicationState,
kafkaStreamsAssignments);
+ loadBasedStandbyTaskAssignment(applicationState,
kafkaStreamsAssignments);
}
}
@@ -185,34 +280,43 @@ public final class TaskAssignmentUtils {
* <p>
* This method optimizes cross-rack traffic for active tasks only. For
standby task optimization,
* use {@link #optimizeRackAwareStandbyTasks}.
+ * <p>
+ * It is recommended to run this optimization before assigning any standby
tasks, especially if you have configured
+ * your KafkaStreams clients with assignment tags via the
rack.aware.assignment.tags config since this method may
+ * shuffle around active tasks without considering the client tags and can
result in a violation of the original
+ * client tag assignment's constraints.
*
- * @param applicationState the metadata and other info describing
the current application state
+ * @param optimizationParams optional configuration parameters to
apply
* @param kafkaStreamsAssignments the current assignment of tasks to
KafkaStreams clients
- * @param tasks the set of tasks to reassign if
possible. Must already be assigned to a KafkaStreams client
- *
- * @return a map with the KafkaStreamsAssignments updated to minimize
cross-rack traffic for active tasks
*/
- public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareActiveTasks(final ApplicationState applicationState,
-
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
-
final SortedSet<TaskId> tasks) {
- if (tasks.isEmpty()) {
- return kafkaStreamsAssignments;
+ public static void optimizeRackAwareActiveTasks(final
RackAwareOptimizationParams optimizationParams,
+ final Map<ProcessId,
KafkaStreamsAssignment> kafkaStreamsAssignments) {
+ final ApplicationState applicationState =
optimizationParams.applicationState;
+ final SortedSet<TaskId> activeTasksToOptimize =
getTasksToOptimize(kafkaStreamsAssignments, optimizationParams,
AssignedTask.Type.ACTIVE);
+ if (activeTasksToOptimize.isEmpty()) {
+ return;
}
- if (!canPerformRackAwareOptimization(applicationState,
AssignedTask.Type.ACTIVE)) {
- return kafkaStreamsAssignments;
+ if (!canPerformRackAwareOptimization(applicationState,
optimizationParams, AssignedTask.Type.ACTIVE)) {
+ return;
}
initializeAssignmentsForAllClients(applicationState,
kafkaStreamsAssignments);
- final int crossRackTrafficCost =
applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt();
- final int nonOverlapCost =
applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt();
+ final int crossRackTrafficCost =
+ optimizationParams.trafficCostOverride.orElseGet(() ->
applicationState.assignmentConfigs()
+ .rackAwareTrafficCost()
+ .getAsInt());
+ final int nonOverlapCost =
+ optimizationParams.nonOverlapCostOverride.orElseGet(() ->
applicationState.assignmentConfigs()
+ .rackAwareNonOverlapCost()
+ .getAsInt());
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates =
applicationState.kafkaStreamsStates(false);
- final List<TaskId> taskIds = new ArrayList<>(tasks);
+ final List<TaskId> taskIds = new ArrayList<>(activeTasksToOptimize);
final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId =
applicationState.allTasks().values().stream()
- .filter(taskInfo -> tasks.contains(taskInfo.id()))
+ .filter(taskInfo -> activeTasksToOptimize.contains(taskInfo.id()))
.collect(Collectors.toMap(TaskInfo::id,
TaskInfo::topicPartitions));
final List<ProcessId> clientIds = new
ArrayList<>(kafkaStreamsStates.keySet());
@@ -259,8 +363,6 @@ public final class TaskAssignmentUtils {
(assignment, taskId) -> assignment.removeTask(new
AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
(assignment, taskId) -> assignment.tasks().containsKey(taskId) &&
assignment.tasks().get(taskId).type() == AssignedTask.Type.ACTIVE
);
-
- return kafkaStreamsAssignments;
}
/**
@@ -283,31 +385,34 @@ public final class TaskAssignmentUtils {
* This method optimizes cross-rack traffic for standby tasks only. For
active task optimization,
* use {@link #optimizeRackAwareActiveTasks}.
*
+ * @param optimizationParams optional configuration parameters to
apply
* @param kafkaStreamsAssignments the current assignment of tasks to
KafkaStreams clients
- * @param applicationState the metadata and other info describing
the current application state
- *
- * @return a map with the KafkaStreamsAssignments updated to minimize
cross-rack traffic for standby tasks
*/
- public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTasks(final ApplicationState applicationState,
-
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
- if (!canPerformRackAwareOptimization(applicationState,
AssignedTask.Type.STANDBY)) {
- return kafkaStreamsAssignments;
+ public static void optimizeRackAwareStandbyTasks(final
RackAwareOptimizationParams optimizationParams,
+ final Map<ProcessId,
KafkaStreamsAssignment> kafkaStreamsAssignments) {
+ final ApplicationState applicationState =
optimizationParams.applicationState;
+ final SortedSet<TaskId> standbyTasksToOptimize =
getTasksToOptimize(kafkaStreamsAssignments, optimizationParams,
AssignedTask.Type.STANDBY);
+ if (standbyTasksToOptimize.isEmpty()) {
+ return;
+ }
+
+ if (!canPerformRackAwareOptimization(applicationState,
optimizationParams, AssignedTask.Type.STANDBY)) {
+ return;
}
initializeAssignmentsForAllClients(applicationState,
kafkaStreamsAssignments);
- final int crossRackTrafficCost =
applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt();
- final int nonOverlapCost =
applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt();
+ final int crossRackTrafficCost =
+ optimizationParams.trafficCostOverride.orElseGet(() ->
applicationState.assignmentConfigs()
+ .rackAwareTrafficCost()
+ .getAsInt());
+ final int nonOverlapCost =
+ optimizationParams.nonOverlapCostOverride.orElseGet(() ->
applicationState.assignmentConfigs()
+ .rackAwareNonOverlapCost()
+ .getAsInt());
final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates =
applicationState.kafkaStreamsStates(false);
- final List<TaskId> standbyTasksToOptimize =
kafkaStreamsAssignments.values().stream()
- .flatMap(r -> r.tasks().values().stream())
- .filter(task -> task.type() == AssignedTask.Type.STANDBY)
- .map(AssignedTask::id)
- .distinct()
- .collect(Collectors.toList());
-
final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId =
applicationState.allTasks().values().stream().collect(Collectors.toMap(
TaskInfo::id,
@@ -317,7 +422,7 @@ public final class TaskAssignmentUtils {
final List<ProcessId> clientIds = new
ArrayList<>(kafkaStreamsStates.keySet());
final long initialCost = computeTotalAssignmentCost(
topicPartitionsByTaskId,
- standbyTasksToOptimize,
+ new ArrayList<>(standbyTasksToOptimize),
clientIds,
kafkaStreamsAssignments,
kafkaStreamsStates,
@@ -411,7 +516,7 @@ public final class TaskAssignmentUtils {
}
final long finalCost = computeTotalAssignmentCost(
topicPartitionsByTaskId,
- standbyTasksToOptimize,
+ new ArrayList<>(standbyTasksToOptimize),
clientIds,
kafkaStreamsAssignments,
kafkaStreamsStates,
@@ -424,7 +529,6 @@ public final class TaskAssignmentUtils {
final long duration = System.currentTimeMillis() - startTime;
LOG.info("Assignment after {} rounds and {} milliseconds for standby
task optimization is {}\n with cost {}",
round, duration, kafkaStreamsAssignments, finalCost);
- return kafkaStreamsAssignments;
}
private static long computeTotalAssignmentCost(final Map<TaskId,
Set<TaskTopicPartition>> topicPartitionsByTaskId,
@@ -541,6 +645,7 @@ public final class TaskAssignmentUtils {
* is set.
*/
private static boolean canPerformRackAwareOptimization(final
ApplicationState applicationState,
+ final
RackAwareOptimizationParams optimizationParams,
final
AssignedTask.Type taskType) {
final AssignmentConfigs assignmentConfigs =
applicationState.assignmentConfigs();
final String rackAwareAssignmentStrategy =
assignmentConfigs.rackAwareAssignmentStrategy();
@@ -902,6 +1007,20 @@ public final class TaskAssignmentUtils {
}
}
+ private static SortedSet<TaskId> getTasksToOptimize(final Map<ProcessId,
KafkaStreamsAssignment> assignments,
+ final
RackAwareOptimizationParams optimizationParams,
+ final
AssignedTask.Type taskType) {
+ if (optimizationParams != null &&
optimizationParams.tasksToOptimize.isPresent()) {
+ return optimizationParams.tasksToOptimize.get();
+ }
+
+ return assignments.values().stream()
+ .flatMap(r -> r.tasks().values().stream())
+ .filter(task -> task.type() == taskType)
+ .map(AssignedTask::id)
+ .collect(Collectors.toCollection(TreeSet::new));
+ }
+
private static class TagStatistics {
private final Map<String, Set<String>> tagKeyToValues;
private final Map<KeyValue<String, String>, Set<ProcessId>>
tagEntryToClients;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
index 827c9138c99..3d5e5b247f7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.TaskId;
@@ -39,9 +38,11 @@ import
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.Assi
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import
org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
+import
org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,21 +95,19 @@ public class StickyTaskAssignor implements TaskAssignor {
final Map<ProcessId, KafkaStreamsAssignment> currentAssignments =
assignmentState.newAssignments;
- final Set<TaskId> statefulTasks =
applicationState.allTasks().values().stream()
- .filter(TaskInfo::isStateful)
- .map(TaskInfo::id)
- .collect(Collectors.toSet());
- final Map<ProcessId, KafkaStreamsAssignment>
optimizedAssignmentsForStatefulTasks =
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
- applicationState, currentAssignments, new
TreeSet<>(statefulTasks));
-
- final Set<TaskId> statelessTasks =
applicationState.allTasks().values().stream()
- .filter(task -> !task.isStateful())
- .map(TaskInfo::id)
- .collect(Collectors.toSet());
- final Map<ProcessId, KafkaStreamsAssignment>
optimizedAssignmentsForAllTasks =
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
- applicationState, optimizedAssignmentsForStatefulTasks, new
TreeSet<>(statelessTasks));
-
-
assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks);
+ TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+
RackAwareOptimizationParams.of(applicationState).forStatefulTasks(),
+ currentAssignments
+ );
+
+ TaskAssignmentUtils.optimizeRackAwareActiveTasks(
+ RackAwareOptimizationParams.of(applicationState)
+ .forStatelessTasks()
+
.withTrafficCostOverride(RackAwareTaskAssignor.STATELESS_TRAFFIC_COST)
+
.withNonOverlapCostOverride(RackAwareTaskAssignor.STATELESS_NON_OVERLAP_COST),
+ currentAssignments
+ );
+ assignmentState.processOptimizedAssignments(currentAssignments);
}
private void optimizeStandby(final ApplicationState applicationState,
final AssignmentState assignmentState) {
@@ -120,10 +119,9 @@ public class StickyTaskAssignor implements TaskAssignor {
return;
}
- final Map<ProcessId, KafkaStreamsAssignment> currentAssignments =
assignmentState.newAssignments;
- final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments =
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(
- applicationState, currentAssignments);
- assignmentState.processOptimizedAssignments(optimizedAssignments);
+ final Map<ProcessId, KafkaStreamsAssignment> assignments =
assignmentState.newAssignments;
+
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
assignments);
+ assignmentState.processOptimizedAssignments(assignments);
}
private static void assignActive(final ApplicationState applicationState,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
index 7bcf386ab2a..8eff5812284 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignmentUtilsTest.java
@@ -50,6 +50,7 @@ import
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.Assi
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import
org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils.RackAwareOptimizationParams;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
import org.junit.Rule;
@@ -88,14 +89,14 @@ public class TaskAssignmentUtilsTest {
);
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
- applicationState, assignments, new TreeSet<>(tasks.keySet()));
+ RackAwareOptimizationParams.of(applicationState), assignments);
assertThat(assignments.size(), equalTo(2));
assertThat(assignments.get(processId(1)).tasks().keySet(),
equalTo(mkSet(TASK_0_1)));
assertThat(assignments.get(processId(2)).tasks().keySet(),
equalTo(mkSet(TASK_0_0)));
// Repeated to make sure nothing gets shifted around after the first
round of optimization.
TaskAssignmentUtils.optimizeRackAwareActiveTasks(
- applicationState, assignments, new TreeSet<>(tasks.keySet()));
+ RackAwareOptimizationParams.of(applicationState), assignments);
assertThat(assignments.size(), equalTo(2));
assertThat(assignments.get(processId(1)).tasks().keySet(),
equalTo(mkSet(TASK_0_1)));
assertThat(assignments.get(processId(2)).tasks().keySet(),
equalTo(mkSet(TASK_0_0)));
@@ -127,7 +128,7 @@ public class TaskAssignmentUtilsTest {
mkAssignment(AssignedTask.Type.STANDBY, 3, TASK_0_0)
);
- TaskAssignmentUtils.optimizeRackAwareStandbyTasks(applicationState,
assignments);
+
TaskAssignmentUtils.optimizeRackAwareStandbyTasks(RackAwareOptimizationParams.of(applicationState),
assignments);
assertThat(assignments.size(), equalTo(3));
assertThat(assignments.get(processId(1)).tasks().keySet(),
equalTo(mkSet(TASK_0_0, TASK_0_1)));
assertThat(assignments.get(processId(2)).tasks().keySet(),
equalTo(mkSet(TASK_0_0)));