This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit eeea1ca92059bf14fdeecfb168091efa3a605168 Author: Alieh Saeedi <[email protected]> AuthorDate: Fri Aug 30 09:55:34 2024 +0200 StickyTaskAssignor Implement the `StickyTaskAssignor` for KIP-1071. --- .../coordinator/group/GroupCoordinatorShard.java | 3 +- .../group/streams/TargetAssignmentBuilder.java | 3 +- .../group/streams/TopologyMetadata.java | 6 + .../coordinator/group/taskassignor/GroupSpec.java | 2 + .../group/taskassignor/GroupSpecImpl.java | 14 +- .../group/taskassignor/ProcessState.java | 144 +++ .../group/taskassignor/StickyTaskAssignor.java | 437 +++++++ .../coordinator/group/taskassignor/TaskId.java | 4 + .../group/taskassignor/TopologyDescriber.java | 2 + .../group/streams/TargetAssignmentBuilderTest.java | 2 +- .../group/taskassignor/GroupSpecImplTest.java | 3 +- .../group/taskassignor/MockAssignorTest.java | 37 +- .../group/taskassignor/StickyTaskAssignorTest.java | 1201 ++++++++++++++++++++ 13 files changed, 1844 insertions(+), 14 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index dff55c9b684..cb2833f7029 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -108,6 +108,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.taskassignor.MockAssignor; +import org.apache.kafka.coordinator.group.taskassignor.StickyTaskAssignor; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -256,7 +257,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord .withShareGroupSessionTimeout(config.shareGroupSessionTimeoutMs()) .withShareGroupHeartbeatInterval(config.shareGroupHeartbeatIntervalMs()) // TODO: Do we need separate configs for streams groups? - .withStreamsGroupAssignors(Collections.singletonList(new MockAssignor())) + .withStreamsGroupAssignors(Collections.singletonList(new StickyTaskAssignor())) .withStreamsGroupMaxSize(config.consumerGroupMaxSize()) .withStreamsGroupSessionTimeout(config.consumerGroupSessionTimeoutMs()) .withStreamsGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs()) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java index 51e07c3418c..a09919e2157 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilder.java @@ -284,7 +284,8 @@ public class TargetAssignmentBuilder { newGroupAssignment = assignor.assign( new GroupSpecImpl( Collections.unmodifiableMap(memberSpecs), - new ArrayList<>(topology.subtopologies().keySet()) + new ArrayList<>(topology.subtopologies().keySet()), + new HashMap<>() ), new TopologyMetadata(subscriptionMetadata, topology) ); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java index ebc0f239f7d..c2b253f001c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyMetadata.java @@ -57,6 +57,12 @@ public class TopologyMetadata implements TopologyDescriber { return topology; } + @Override + public boolean isStateful(String subtopologyId) { + //TODO + return false; + } + /** * The number of partitions for the given subtopology ID. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java index cb9c5f3e398..46b279416f5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpec.java @@ -34,4 +34,6 @@ public interface GroupSpec { */ List<String> subtopologies(); + Map<String, String> assignmentConfigs(); + } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java index e08acaa468e..539ce9203db 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImpl.java @@ -35,14 +35,18 @@ public class GroupSpecImpl implements GroupSpec { */ private final List<String> subtopologies; + private final Map<String, String> assignmentConfigs; + public GroupSpecImpl( Map<String, AssignmentMemberSpec> members, - List<String> subtopologies + List<String> subtopologies, + Map<String, String> assignmentConfigs ) { Objects.requireNonNull(members); Objects.requireNonNull(subtopologies); this.members = members; this.subtopologies = subtopologies; + this.assignmentConfigs = assignmentConfigs; } /** @@ -58,6 +62,11 @@ public class GroupSpecImpl implements GroupSpec { return subtopologies; } + @Override + public Map<String, String> assignmentConfigs() { + return assignmentConfigs; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -68,7 +77,8 @@ public class GroupSpecImpl implements GroupSpec { } final GroupSpecImpl groupSpec = (GroupSpecImpl) o; return Objects.equals(members, groupSpec.members) - && Objects.equals(subtopologies, groupSpec.subtopologies); + && Objects.equals(subtopologies, groupSpec.subtopologies) + && Objects.equals(assignmentConfigs, groupSpec.assignmentConfigs); } @Override diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/ProcessState.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/ProcessState.java new file mode 100644 index 00000000000..a0c494200f6 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/ProcessState.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.taskassignor; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Collections.unmodifiableSet; +import static org.apache.kafka.common.utils.Utils.union; + +public class ProcessState { + private final String processId; + // number of members + private int capacity; + private double load; + private final Map<String, Integer> memberToTaskCounts; + private final Map<String, Set<TaskId>> assignedActiveTasks; + private final Map<String, Set<TaskId>> assignedStandbyTasks; + + + ProcessState(final String processId) { + this.processId = processId; + this.capacity = 0; + this.load = Double.MAX_VALUE; + this.assignedActiveTasks = new HashMap<>(); + this.assignedStandbyTasks = new HashMap<>(); + this.memberToTaskCounts = new HashMap<>(); + } + + + public String processId() { + return processId; + } + + public int capacity() { + return capacity; + } + + public int totalTaskCount() { + return assignedStandbyTasks().size() + assignedActiveTasks().size(); + } + + public double load() { + return load; + } + + public Map<String, Integer> memberToTaskCounts() { + return memberToTaskCounts; + } + + public Set<TaskId> assignedActiveTasks() { + return assignedActiveTasks.values().stream() + .flatMap(Set::stream) + .collect(Collectors.toSet()); + } + + public Map<String, Set<TaskId>> assignedActiveTasksByMember() { + return assignedActiveTasks; + } + + public Set<TaskId> assignedStandbyTasks() { + return assignedStandbyTasks.values().stream() + .flatMap(Set::stream) + .collect(Collectors.toSet()); + } + + public Map<String, Set<TaskId>> assignedStandbyTasksByMember() { + return assignedStandbyTasks; + } + + public void addTask(final String memberId, final TaskId taskId, final boolean isActive) { + if (isActive) { + assignedActiveTasks.putIfAbsent(memberId, new HashSet<>()); + assignedActiveTasks.get(memberId).add(taskId); + } else { + assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>()); + assignedStandbyTasks.get(memberId).add(taskId); + } + memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1); + computeLoad(); + } + + private void incrementCapacity() { + capacity++; + computeLoad(); + } + public void computeLoad() { + if (capacity <= 0) { + this.load = -1; + } else { + this.load = (double) totalTaskCount() / capacity; + } + } + + public void addMember(final String member) { + this.memberToTaskCounts.put(member, 0); + incrementCapacity(); + } + + public boolean hasCapacity() { + return totalTaskCount() < capacity; + } + + public int compareTo(final ProcessState other) { + int loadCompare = Double.compare(this.load, other.load()); + if (loadCompare == 0) { + return Integer.compare(other.capacity, this.capacity); + } + return loadCompare; + } + + public boolean hasTask(final TaskId taskId) { + return assignedActiveTasks().contains(taskId) || assignedStandbyTasks().contains(taskId); } + + + Set<TaskId> assignedTasks() { + final Set<TaskId> assignedActiveTaskIds = assignedActiveTasks(); + final Set<TaskId> assignedStandbyTaskIds = assignedStandbyTasks(); + return unmodifiableSet( + union( + () -> new HashSet<>(assignedActiveTaskIds.size() + assignedStandbyTaskIds.size()), + assignedActiveTaskIds, + assignedStandbyTaskIds + ) + ); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignor.java new file mode 100644 index 00000000000..db67eb373d9 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignor.java @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.taskassignor; + + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class StickyTaskAssignor implements TaskAssignor { + + public static final String STICKY_ASSIGNOR_NAME = "sticky"; + private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class); + + // helper data structures: + private TaskPairs taskPairs; + Map<TaskId, Member> activeTaskToPrevMember; + Map<TaskId, Set<Member>> standbyTaskToPrevMember; + Map<String, ProcessState> processIdToState; + + int allTasks; + int totalCapacity; + int tasksPerMember; + + @Override + public String name() { + return STICKY_ASSIGNOR_NAME; + } + + @Override + public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) throws TaskAssignorException { + + initialize(groupSpec, topologyDescriber); + + //active + Set<TaskId> activeTasks = toTaskIds(groupSpec, topologyDescriber, true); + assignActive(activeTasks); + + //standby + final int numStandbyReplicas = + groupSpec.assignmentConfigs().isEmpty() ? 0 + : Integer.parseInt(groupSpec.assignmentConfigs().get("numStandbyReplicas")); + if (numStandbyReplicas > 0) { + Set<TaskId> statefulTasks = toTaskIds(groupSpec, topologyDescriber, false); + assignStandby(statefulTasks, numStandbyReplicas); + } + + return buildGroupAssignment(groupSpec.members().keySet()); + } + + private Set<TaskId> toTaskIds(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber, final boolean isActive) { + Set<TaskId> ret = new HashSet<>(); + for (String subtopology : groupSpec.subtopologies()) { + if (isActive || topologyDescriber.isStateful(subtopology)) { + int numberOfPartitions = topologyDescriber.numPartitions(subtopology); + for (int i = 0; i < numberOfPartitions; i++) { + ret.add(new TaskId(subtopology, i)); + } + } + } + return ret; + } + + private void initialize(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { + + allTasks = 0; + for (String subtopology : groupSpec.subtopologies()) { + int numberOfPartitions = topologyDescriber.numPartitions(subtopology); + allTasks += numberOfPartitions; + } + totalCapacity = groupSpec.members().size(); + tasksPerMember = computeTasksPerMember(allTasks, totalCapacity); + + taskPairs = new TaskPairs(allTasks * (allTasks - 1) / 2); + + processIdToState = new HashMap<>(); + activeTaskToPrevMember = new HashMap<>(); + standbyTaskToPrevMember = new HashMap<>(); + for (Map.Entry<String, AssignmentMemberSpec> memberEntry : groupSpec.members().entrySet()) { + final String memberId = memberEntry.getKey(); + final String processId = memberEntry.getValue().processId(); + final Member member = new Member(processId, memberId); + final AssignmentMemberSpec memberSpec = memberEntry.getValue(); + + processIdToState.putIfAbsent(processId, new ProcessState(processId)); + processIdToState.get(processId).addMember(memberId); + + // prev active tasks + for (Map.Entry<String, Set<Integer>> entry : memberSpec.activeTasks().entrySet()) { + Set<Integer> partitionNoSet = entry.getValue(); + for (int partitionNo : partitionNoSet) { + activeTaskToPrevMember.put(new TaskId(entry.getKey(), partitionNo), member); + } + } + + // prev standby tasks + for (Map.Entry<String, Set<Integer>> entry : memberSpec.standbyTasks().entrySet()) { + Set<Integer> partitionNoSet = entry.getValue(); + for (int partitionNo : partitionNoSet) { + TaskId taskId = new TaskId(entry.getKey(), partitionNo); + standbyTaskToPrevMember.putIfAbsent(taskId, new HashSet<>()); + standbyTaskToPrevMember.get(taskId).add(member); + } + } + } + } + + private GroupAssignment buildGroupAssignment(final Set<String> members) { + final Map<String, MemberAssignment> memberAssignments = new HashMap<>(); + final Map<String, Set<TaskId>> activeTasksAssignments = activeTasksAssignments(); + final Map<String, Set<TaskId>> standbyTasksAssignments = standbyTasksAssignments(); + + for (String memberId : members) { + Map<String, Set<Integer>> activeTasks = new HashMap<>(); + if (activeTasksAssignments.containsKey(memberId)) { + activeTasks = toCompactedTaskIds(activeTasksAssignments.get(memberId)); + } + Map<String, Set<Integer>> standByTasks = new HashMap<>(); + + if (standbyTasksAssignments.containsKey(memberId)) { + standByTasks = toCompactedTaskIds(standbyTasksAssignments.get(memberId)); + } + memberAssignments.put(memberId, new MemberAssignment(activeTasks, standByTasks, new HashMap<>())); + } + + return new GroupAssignment(memberAssignments); + } + + private Map<String, Set<TaskId>> standbyTasksAssignments() { + return processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().assignedStandbyTasksByMember().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> { + set1.addAll(set2); + return set1; + })); + } + + private Map<String, Set<TaskId>> activeTasksAssignments() { + return processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().assignedActiveTasksByMember().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (set1, set2) -> { + set1.addAll(set2); + return set1; + })); + } + + private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> taskIds) { + Map<String, Set<Integer>> ret = new HashMap<>(); + for (TaskId taskId : taskIds) { + ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>()); + ret.get(taskId.subtopologyId()).add(taskId.partition()); + } + return ret; + } + + private void assignActive(final Set<TaskId> activeTasks) { + + // 1. re-assigning existing active tasks to clients that previously had the same active tasks + for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Member prevMember = activeTaskToPrevMember.get(task); + if (prevMember != null && (hasUnfulfilledQuota(prevMember))) { + processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); + updateHelpers(prevMember, task, true); + it.remove(); + } + } + + // 2. re-assigning tasks to clients that previously have seen the same task (as standby task) + for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Set<Member> prevMembers = standbyTaskToPrevMember.get(task); + if (prevMembers != null && !prevMembers.isEmpty()) { + final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); + if (prevMember != null && hasUnfulfilledQuota(prevMember)) { + processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); + updateHelpers(prevMember, task, true); + it.remove(); + } + } + } + + // 3. assign any remaining unassigned tasks + for (Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) { + final TaskId task = it.next(); + final Member member = findMemberWithLeastLoad(task); + if (member != null) { + processIdToState.get(member.processId).addTask(member.memberId, task, true); + it.remove(); + updateHelpers(member, task, true); + } + } + } + + private void maybeUpdateTasksPerMember(final int activeTasksNo) { + if (activeTasksNo == tasksPerMember) { + totalCapacity--; + allTasks -= activeTasksNo; + tasksPerMember = computeTasksPerMember(allTasks, totalCapacity); + } + } + + private Member findMemberWithLeastLoad(final Set<Member> members, TaskId taskId, final boolean returnSameMember) { + Set<Member> rightPairs = members.stream() + .filter(member -> taskPairs.hasNewPair(taskId, processIdToState.get(member.processId).assignedTasks())) + .collect(Collectors.toSet()); + if (rightPairs.isEmpty()) { + rightPairs = members; + } + Optional<ProcessState> processWithLeastLoad = rightPairs.stream() + .map(member -> processIdToState.get(member.processId)) + .min(Comparator.comparingDouble(ProcessState::load)); + + assert processWithLeastLoad.isPresent(); + // if the same exact former member is needed + if (returnSameMember) { + return standbyTaskToPrevMember.get(taskId).stream() + .filter(standby -> standby.processId.equals(processWithLeastLoad.get().processId())) + .findFirst() + .orElseGet(() -> memberWithLeastLoad(processWithLeastLoad.get())); + } + return memberWithLeastLoad(processWithLeastLoad.get()); + } + + private Member findMemberWithLeastLoad(final TaskId taskId) { + Set<Member> allMembers = processIdToState.entrySet().stream() + .flatMap(entry -> entry.getValue().memberToTaskCounts().keySet().stream() + .map(memberId -> new Member(entry.getKey(), memberId))) + .collect(Collectors.toSet()); + return findMemberWithLeastLoad(allMembers, taskId, false); + } + + private Member findMemberWithLeastLoad(final TaskId taskId, final Set<String> processes) { + Set<Member> allMembers = processes.stream() + .flatMap(processId -> processIdToState.get(processId).memberToTaskCounts().keySet().stream() + .map(memberId -> new Member(processId, memberId))) + .collect(Collectors.toSet()); + return findMemberWithLeastLoad(allMembers, taskId, false); + } + + private Member memberWithLeastLoad(final ProcessState processWithLeastLoad) { + Optional<String> memberWithLeastLoad = processWithLeastLoad.memberToTaskCounts().entrySet().stream() + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey); + return memberWithLeastLoad.map(memberId -> new Member(processWithLeastLoad.processId(), memberId)).orElse(null); + } + + private boolean hasUnfulfilledQuota(final Member member) { + return processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId) < tasksPerMember; + } + + private void assignStandby(final Set<TaskId> standbyTasks, final int numStandbyReplicas) { + for (TaskId task : standbyTasks) { + for (int i = 0; i < numStandbyReplicas; i++) { + final Set<String> availableProcesses = findAllowedProcesses(task); + if (availableProcesses.isEmpty()) { + log.warn("Unable to assign " + (numStandbyReplicas - i) + + " of " + numStandbyReplicas + " standby tasks for task [" + task + "]. " + + "There is not enough available capacity. You should " + + "increase the number of threads and/or application instances " + + "to maintain the requested number of standby replicas."); + break; + } + Member standby = null; + + // prev active task + Member prevMember = activeTaskToPrevMember.get(task); + if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId) + && taskPairs.hasNewPair(task, processIdToState.get(prevMember.processId).assignedTasks())) { + standby = prevMember; + } + + // prev standby tasks + if (standby == null) { + final Set<Member> prevMembers = standbyTaskToPrevMember.get(task); + if (prevMembers != null && !prevMembers.isEmpty()) { + prevMembers.removeIf(member -> !availableProcesses.contains(member.processId)); + prevMember = findMemberWithLeastLoad(prevMembers, task, true); + if (prevMember != null && isLoadBalanced(prevMember.processId)) { + standby = prevMember; + } + } + } + + // others + if (standby == null) { + standby = findMemberWithLeastLoad(task, availableProcesses); + } + processIdToState.get(standby.processId).addTask(standby.memberId, task, false); + updateHelpers(standby, task, false); + } + + } + } + + private boolean isLoadBalanced(final String processId) { + final ProcessState process = processIdToState.get(processId); + return process.hasCapacity() || isLeastLoadedProcess(process.load()); + } + + private boolean isLeastLoadedProcess(final double load) { + return processIdToState.values().stream() + .allMatch(process -> process.load() >= load); + } + + private Set<String> findAllowedProcesses(final TaskId taskId) { + return processIdToState.values().stream() + .filter(process -> !process.hasTask(taskId)) + .map(ProcessState::processId) + .collect(Collectors.toSet()); + } + + private void updateHelpers(final Member member, final TaskId taskId, final boolean isActive) { + // add all pair combinations: update taskPairs + taskPairs.addPairs(taskId, processIdToState.get(member.processId).assignedTasks()); + + if (isActive) { + // update task per process + maybeUpdateTasksPerMember(processIdToState.get(member.processId).assignedActiveTasks().size()); + } + } + + private static int computeTasksPerMember(final int numberOfTasks, final int numberOfMembers) { + if (numberOfMembers == 0) { + return 0; + } + int tasksPerMember = numberOfTasks / numberOfMembers; + if (numberOfTasks % numberOfMembers > 0) { + tasksPerMember++; + } + return tasksPerMember; + } + + private static class TaskPairs { + private final Set<Pair> pairs; + private final int maxPairs; + + TaskPairs(final int maxPairs) { + this.maxPairs = maxPairs; + this.pairs = new HashSet<>(maxPairs); + } + + boolean hasNewPair(final TaskId task1, + final Set<TaskId> taskIds) { + if (pairs.size() == maxPairs) { + return false; + } + if (taskIds.size() == 0) { + return true; + } + for (final TaskId taskId : taskIds) { + if (!pairs.contains(pair(task1, taskId))) { + return true; + } + } + return false; + } + + void addPairs(final TaskId taskId, final Set<TaskId> assigned) { + for (final TaskId id : assigned) { + if (!id.equals(taskId)) + pairs.add(pair(id, taskId)); + } + } + + Pair pair(final TaskId task1, final TaskId task2) { + if (task1.compareTo(task2) < 0) { + return new Pair(task1, task2); + } + return new Pair(task2, task1); + } + + + private static class Pair { + private final TaskId task1; + private final TaskId task2; + + Pair(final TaskId task1, final TaskId task2) { + this.task1 = task1; + this.task2 = task2; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Pair pair = (Pair) o; + return Objects.equals(task1, pair.task1) && + Objects.equals(task2, pair.task2); + } + + @Override + public int hashCode() { + return Objects.hash(task1, task2); + } + } + } + + static class Member { + private final String processId; + private final String memberId; + + public Member(final String processId, final String memberId) { + this.processId = processId; + this.memberId = memberId; + } + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java index 3b828a60413..885858f700d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TaskId.java @@ -63,4 +63,8 @@ public final class TaskId { ", partition=" + partition + '}'; } + + public int compareTo(final TaskId other) { + return this.hashCode() - other.hashCode(); + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java index f2a28e2cc10..8b3d5bdec26 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/taskassignor/TopologyDescriber.java @@ -34,4 +34,6 @@ public interface TopologyDescriber { */ int numPartitions(String subtopologyId); + boolean isStateful(String subtopologyId); + } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java index d97832eb969..c6b135eeebf 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/TargetAssignmentBuilderTest.java @@ -211,7 +211,7 @@ public class TargetAssignmentBuilderTest { Map<String, Map<Integer, String>> invertedTargetAssignment = TaskAssignmentTestUtil.invertedTargetAssignment(memberSpecs); // Prepare the expected assignment spec. - GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new ArrayList<>(topology.subtopologies().keySet())); + GroupSpecImpl groupSpec = new GroupSpecImpl(memberSpecs, new ArrayList<>(topology.subtopologies().keySet()), new HashMap<>()); // We use `any` here to always return an assignment but use `verify` later on // to ensure that the input was correct. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java index 6e34368a155..bad6b8e1c1c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/GroupSpecImplTest.java @@ -55,7 +55,8 @@ public class GroupSpecImplTest { groupSpec = new GroupSpecImpl( members, - subtopologies + subtopologies, + new HashMap<>() ); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java index 457bbdb524d..a63b2e7c7ca 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/MockAssignorTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -43,9 +44,10 @@ public class MockAssignorTest { final GroupAssignment result = assignor.assign( new GroupSpecImpl( Collections.emptyMap(), - Collections.emptyList() + Collections.emptyList(), + new HashMap<>() ), - x -> 5 + new TopologyDescriberImpl(5) ); assertEquals(0, result.members().size()); @@ -69,9 +71,10 @@ public class MockAssignorTest { final GroupAssignment result = assignor.assign( new GroupSpecImpl( Collections.singletonMap("test_member", memberSpec), - Collections.singletonList("test-subtopology") + Collections.singletonList("test-subtopology"), + new HashMap<>() ), - x -> 4 + new TopologyDescriberImpl(4) ); assertEquals(1, result.members().size()); @@ -111,9 +114,10 @@ public class MockAssignorTest { final GroupAssignment result = assignor.assign( new GroupSpecImpl( mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)), - Arrays.asList("test-subtopology1", "test-subtopology2") + Arrays.asList("test-subtopology1", "test-subtopology2"), + new HashMap<>() ), - x -> 4 + new TopologyDescriberImpl(4) ); final Map<String, Set<Integer>> expected1 = mkMap( @@ -167,9 +171,10 @@ public class MockAssignorTest { final GroupAssignment result = assignor.assign( new GroupSpecImpl( mkMap(mkEntry("test_member1", memberSpec1), mkEntry("test_member2", memberSpec2)), - Arrays.asList("test-subtopology1", "test-subtopology2") + Arrays.asList("test-subtopology1", "test-subtopology2"), + new HashMap<>() ), - x -> 4 + new TopologyDescriberImpl(4) ); assertEquals(2, result.members().size()); @@ -187,4 +192,20 @@ public class MockAssignorTest { ), testMember2.activeTasks()); } + static class TopologyDescriberImpl implements TopologyDescriber { + final int numPartitions; + + TopologyDescriberImpl(int numPartitions) { + this.numPartitions = numPartitions; + } + @Override + public int numPartitions(String subtopologyId) { + return numPartitions; + } + @Override + public boolean isStateful(String subtopologyId) { + return false; + } + } + } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java new file mode 100644 index 00000000000..6455fb5cd68 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/taskassignor/StickyTaskAssignorTest.java @@ -0,0 +1,1201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group.taskassignor; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StickyTaskAssignorTest { + private final StickyTaskAssignor assignor = new StickyTaskAssignor(); + + + @Test + public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl( + mkMap(mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)), + Collections.singletonList("test-subtopology"), + new HashMap<>() + ), + new TopologyDescriberImpl(3, false) + ); + + assertEquals(3, result.members().size()); + Set<Integer> actualActiveTasks = new HashSet<>(); + for (int i = 0; i < 3; i++) { + final MemberAssignment testMember = result.members().get("member" + (i + 1)); + assertNotNull(testMember); + assertEquals(1, testMember.activeTasks().size()); + actualActiveTasks.addAll(testMember.activeTasks().get("test-subtopology")); + } + assertEquals(mkSet(0, 1, 2), actualActiveTasks); + } + + @Test + public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithNoStandByTasks() { + + final AssignmentMemberSpec memberSpec11 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec12 = createAssignmentMemberSpec("process1"); + + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + + final AssignmentMemberSpec memberSpec31 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec32 = createAssignmentMemberSpec("process3"); + + final Map<String, AssignmentMemberSpec> members = mkMap(mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12), + mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22), + mkEntry("member3_1", memberSpec31), mkEntry("member3_2", memberSpec32)); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + assertEquals(1, getAllActiveTaskCount(result, "member1_1")); + assertEquals(1, getAllActiveTaskCount(result, "member1_2")); + assertEquals(1, getAllActiveTaskCount(result, "member2_1")); + assertEquals(1, getAllActiveTaskCount(result, "member2_2")); + assertEquals(1, getAllActiveTaskCount(result, "member3_1")); + assertEquals(1, getAllActiveTaskCount(result, "member3_2")); + + assertEquals(mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), mkEntry("test-subtopology2", mkSet(0, 1, 2))), + mergeAllActiveTasks(result, "member1_1", "member1_2", "member2_1", "member2_2", "member3_1", "member3_2")); + } + + @Test + public void shouldAssignTopicGroupIdEvenlyAcrossClientsWithStandByTasks() { + final Map<String, Set<Integer>> tasks = mkMap(mkEntry("test-subtopology1", mkSet(0, 1, 2)), mkEntry("test-subtopology2", mkSet(0, 1, 2))); + final AssignmentMemberSpec memberSpec11 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec12 = createAssignmentMemberSpec("process1"); + + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + + final AssignmentMemberSpec memberSpec31 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec32 = createAssignmentMemberSpec("process3"); + + final Map<String, AssignmentMemberSpec> members = mkMap(mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12), + mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22), + mkEntry("member3_1", memberSpec31), mkEntry("member3_2", memberSpec32)); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Arrays.asList("test-subtopology1", "test-subtopology2"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(3, true) + ); + + // active tasks + assertEquals(1, getAllActiveTaskCount(result, "member1_1")); + assertEquals(1, getAllActiveTaskCount(result, "member1_2")); + assertEquals(1, getAllActiveTaskCount(result, "member2_1")); + assertEquals(1, getAllActiveTaskCount(result, "member2_2")); + assertEquals(1, getAllActiveTaskCount(result, "member3_1")); + assertEquals(1, getAllActiveTaskCount(result, "member3_2")); + assertEquals(tasks, + mergeAllActiveTasks(result, "member1_1", "member1_2", "member2_1", "member2_2", "member3_1", "member3_2")); + } + + @Test + public void shouldNotMigrateActiveTaskToOtherProcess() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + Map<String, AssignmentMemberSpec> members = mkMap(mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertTrue(testMember1.activeTasks().get("test-subtopology").contains(0)); + + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1)); + + + assertEquals(3, + testMember1.activeTasks().get("test-subtopology").size() + testMember2.activeTasks().get("test-subtopology").size()); + + + // flip the previous active tasks assignment around. + memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + members = mkMap(mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + + testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1)); + + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertTrue(testMember3.activeTasks().get("test-subtopology").contains(2)); + + + assertEquals(3, + testMember2.activeTasks().get("test-subtopology").size() + testMember3.activeTasks().get("test-subtopology").size()); + } + + @Test + public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertTrue(testMember1.activeTasks().get("test-subtopology").contains(0) || testMember1.activeTasks().get("test-subtopology").contains(2)); + + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertTrue(testMember2.activeTasks().get("test-subtopology").contains(1)); + + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertTrue(testMember3.activeTasks().get("test-subtopology").contains(2) || testMember3.activeTasks().get("test-subtopology").contains(0)); + + + assertEquals(3, + testMember1.activeTasks().get("test-subtopology").size() + + testMember2.activeTasks().get("test-subtopology").size() + + testMember3.activeTasks().get("test-subtopology").size()); + } + + @Test + public void shouldAssignBasedOnCapacity() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22)); + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + + MemberAssignment testMember21 = result.members().get("member2_1"); + assertNotNull(testMember21); + assertEquals(1, testMember21.activeTasks().get("test-subtopology").size()); + + MemberAssignment testMember22 = result.members().get("member2_2"); + assertNotNull(testMember22); + assertEquals(1, testMember22.activeTasks().get("test-subtopology").size()); + } + + @Test + public void shouldAssignTasksEvenlyWithUnequalTopicGroupSizes() { + final Map<String, Set<Integer>> activeTasks = mkMap( + mkEntry("test-subtopology1", mkSet(0, 1, 2, 3, 4, 5)), + mkEntry("test-subtopology2", mkSet(0))); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", activeTasks, Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl2() + ); + + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + final Set<Integer> member1Topology1 = testMember1.activeTasks().get("test-subtopology1"); + final Set<Integer> member1Topology2 = testMember1.activeTasks().getOrDefault("test-subtopology2", new HashSet<>()); + assertEquals(4, member1Topology1.size() + member1Topology2.size()); + + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + final Set<Integer> member2Topology1 = testMember2.activeTasks().get("test-subtopology1"); + final Set<Integer> member2Topology2 = testMember2.activeTasks().getOrDefault("test-subtopology2", new HashSet<>()); + assertEquals(3, member2Topology1.size() + member2Topology2.size()); + + + assertEquals(activeTasks, mkMap( + mkEntry("test-subtopology1", Stream.concat(member1Topology1.stream(), member2Topology1.stream()).collect(Collectors.toSet())), + mkEntry("test-subtopology2", Stream.concat(member1Topology2.stream(), member2Topology2.stream()).collect(Collectors.toSet())))); + } + + @Test + public void shouldKeepActiveTaskStickinessWhenMoreClientThanActiveTasks() { + AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + AssignmentMemberSpec memberSpec5 = createAssignmentMemberSpec("process5"); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5)); + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember1.activeTasks().get("test-subtopology")); + + + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember2.activeTasks().get("test-subtopology")); + + + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertEquals(1, testMember3.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember3.activeTasks().get("test-subtopology")); + + MemberAssignment testMember4 = result.members().get("member4"); + assertNotNull(testMember4); + assertNull(testMember4.activeTasks().get("test-subtopology")); + + MemberAssignment testMember5 = result.members().get("member5"); + assertNotNull(testMember5); + assertNull(testMember5.activeTasks().get("test-subtopology")); + + + // change up the assignment and make sure it is still sticky + memberSpec1 = createAssignmentMemberSpec("process1"); + memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + memberSpec3 = createAssignmentMemberSpec("process3"); + memberSpec4 = createAssignmentMemberSpec("process4", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + memberSpec5 = createAssignmentMemberSpec("process5", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + + + members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5)); + result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertNull(testMember1.activeTasks().get("test-subtopology")); + + + testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember2.activeTasks().get("test-subtopology")); + + + testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertNull(testMember3.activeTasks().get("test-subtopology")); + + testMember4 = result.members().get("member4"); + assertNotNull(testMember4); + assertEquals(1, testMember4.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember4.activeTasks().get("test-subtopology")); + + testMember5 = result.members().get("member5"); + assertNotNull(testMember5); + assertEquals(1, testMember5.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember5.activeTasks().get("test-subtopology")); + } + + @Test + public void shouldAssignTasksToClientWithPreviousStandbyTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Collections.singleton(2)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Collections.singleton(1)))); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", Collections.singleton(0)))); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember1.activeTasks().get("test-subtopology")); + + + MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember2.activeTasks().get("test-subtopology")); + + + MemberAssignment testMember3 = result.members().get("member3"); + assertNotNull(testMember3); + assertEquals(1, testMember3.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember3.activeTasks().get("test-subtopology")); + } + + @Test + public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology", Collections.singleton(0))), + mkMap(mkEntry("test-subtopology", Collections.singleton(1)))); + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology", Collections.singleton(2))), + mkMap(mkEntry("test-subtopology", Collections.singleton(1)))); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2", + Collections.emptyMap(), Collections.emptyMap()); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), + mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22)); + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + + MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(0), testMember1.activeTasks().get("test-subtopology")); + + MemberAssignment testMember21 = result.members().get("member2_1"); + assertNotNull(testMember21); + assertEquals(1, testMember21.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(2), testMember21.activeTasks().get("test-subtopology")); + + MemberAssignment testMember22 = result.members().get("member2_2"); + assertNotNull(testMember22); + assertEquals(1, testMember22.activeTasks().get("test-subtopology").size()); + assertEquals(Collections.singleton(1), testMember22.activeTasks().get("test-subtopology")); + } + + @Test + public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssignedTo() { + final Map<String, Set<Integer>> tasks = mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3))); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4", mkMap(mkEntry("test-subtopology", Collections.singleton(3))), Collections.emptyMap()); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + + + final List<Integer> member1TaskIds = getAllStandbyTaskIds(result, "member1"); + assertFalse(member1TaskIds.contains(0)); + assertTrue(member1TaskIds.size() <= 2); + + final List<Integer> member2TaskIds = getAllStandbyTaskIds(result, "member2"); + assertFalse(member2TaskIds.contains(1)); + assertTrue(member2TaskIds.size() <= 2); + + final List<Integer> member3TaskIds = getAllStandbyTaskIds(result, "member3"); + assertFalse(member3TaskIds.contains(2)); + assertTrue(member3TaskIds.size() <= 2); + + final List<Integer> member4TaskIds = getAllStandbyTaskIds(result, "member4"); + assertFalse(member4TaskIds.contains(3)); + assertTrue(member4TaskIds.size() <= 2); + + + int nonEmptyStandbyTaskCount = 0; + nonEmptyStandbyTaskCount += member1TaskIds.size() == 0 ? 0 : 1; + nonEmptyStandbyTaskCount += member2TaskIds.size() == 0 ? 0 : 1; + nonEmptyStandbyTaskCount += member3TaskIds.size() == 0 ? 0 : 1; + nonEmptyStandbyTaskCount += member4TaskIds.size() == 0 ? 0 : 1; + + assertTrue(nonEmptyStandbyTaskCount >= 3); + assertEquals(tasks, mergeAllStandbyTasks(result)); + + } + + @Test + public void shouldAssignMultipleReplicasOfStandbyTask() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Collections.singleton(0))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Collections.singleton(1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Collections.singleton(2))), Collections.emptyMap()); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), + mkEntry("member3", memberSpec3)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "2"))), + new TopologyDescriberImpl(3, true) + ); + + + assertEquals(mkSet(1, 2), new HashSet<>(getAllStandbyTaskIds(result, "member1"))); + assertEquals(mkSet(0, 2), new HashSet<>(getAllStandbyTaskIds(result, "member2"))); + assertEquals(mkSet(0, 1), new HashSet<>(getAllStandbyTaskIds(result, "member3"))); + } + + @Test + public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(1, true) + ); + assertTrue(getAllStandbyTasks(result, "member1").isEmpty()); + } + + @Test + public void shouldAssignActiveAndStandbyTasks() { + + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), + mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(3, true) + ); + + + + assertEquals(mkSet(0, 1, 2), new HashSet<>(getAllActiveTaskIds(result))); + assertEquals(mkSet(0, 1, 2), new HashSet<>(getAllStandbyTaskIds(result))); + } + + @Test + public void shouldAssignAtLeastOneTaskToEachClientIfPossible() { + final AssignmentMemberSpec memberSpec11 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec12 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec13 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1_1", memberSpec11), mkEntry("member1_2", memberSpec12), mkEntry("member1_3", memberSpec13), + mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + assertEquals(1, getAllActiveTaskIds(result, "member1_1", "member1_2", "member1_3").size()); + assertEquals(1, getAllActiveTaskIds(result, "member2").size()); + assertEquals(1, getAllActiveTaskIds(result, "member3").size()); + } + + @Test + public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + final AssignmentMemberSpec memberSpec5 = createAssignmentMemberSpec("process5"); + final AssignmentMemberSpec memberSpec6 = createAssignmentMemberSpec("process6"); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), + mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5), mkEntry("member6", memberSpec6)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + assertEquals(3, getAllActiveTaskIds(result, "member1", "member2", "member3", "member4", "member5", "member6").size()); + assertEquals(mkSet(0, 1, 2), getActiveTasks(result, "test-subtopology", "member1", "member2", "member3", "member4", "member5", "member6")); + } + + @Test + public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + final AssignmentMemberSpec memberSpec5 = createAssignmentMemberSpec("process5"); + final AssignmentMemberSpec memberSpec6 = createAssignmentMemberSpec("process6"); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), + mkEntry("member4", memberSpec4), mkEntry("member5", memberSpec5), mkEntry("member6", memberSpec6)); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(3, true) + ); + + for (String memberId : result.members().keySet()) { + assertEquals(1, getAllStandbyTasks(result, memberId).size() + getAllActiveTaskIds(result, memberId).size()); + + } + } + + @Test + public void shouldAssignMoreTasksToClientWithMoreCapacity() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec21 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec22 = createAssignmentMemberSpec("process2"); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2_1", memberSpec21), mkEntry("member2_2", memberSpec22)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology0", "test-subtopology1", "test-subtopology2", "test-subtopology3"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + assertEquals(8, getAllActiveTaskCount(result, "member2_1", "member2_2")); + assertEquals(4, getAllActiveTaskCount(result, "member1")); + } + + @Test + public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + + final List<String> allMemberIds = asList("member1", "member2", "member3", "member4"); + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + + + for (final String memberId : allMemberIds) { + final List<Integer> taskIds = getAllTaskIds(result, memberId); + for (final String otherMemberId : allMemberIds) { + if (!memberId.equals(otherMemberId)) { + assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); + } + } + } + } + + @Test + public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", mkSet(0))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + + final List<String> allMemberIds = asList("member1", "member2", "member3", "member4"); + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + + + for (final String memberId : allMemberIds) { + final List<Integer> taskIds = getAllTaskIds(result, memberId); + for (final String otherMemberId : allMemberIds) { + if (!memberId.equals(otherMemberId)) { + assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); + } + } + } + } + + @Test + public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology", mkSet(1, 2))), mkMap(mkEntry("test-subtopology", mkSet(3, 0)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology", mkSet(3, 0))), mkMap(mkEntry("test-subtopology", mkSet(1, 2)))); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + + + final List<String> allMemberIds = asList("member1", "member2", "member3", "member4"); + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, + Collections.singletonList("test-subtopology"), + mkMap(mkEntry("numStandbyReplicas", "1"))), + new TopologyDescriberImpl(4, true) + ); + + + for (final String memberId : allMemberIds) { + final List<Integer> taskIds = getAllTaskIds(result, memberId); + for (final String otherMemberId : allMemberIds) { + if (!memberId.equals(otherMemberId)) { + assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); + } + } + } + } + + @Test + public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() { + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + + assertEquals(1, getAllActiveTaskCount(result, "member1")); + assertEquals(1, getAllActiveTaskCount(result, "member2")); + assertEquals(1, getAllActiveTaskCount(result, "member3")); + assertEquals(1, getAllActiveTaskCount(result, "member4")); + } + + @Test + public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() { + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + + assertEquals(1, getAllActiveTaskCount(result, "member1")); + assertEquals(1, getAllActiveTaskCount(result, "member2")); + assertEquals(2, getAllActiveTaskCount(result, "member3")); + } + + @Test + public void shouldRebalanceTasksToClientsBasedOnCapacity() { + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(0, 3, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec31 = createAssignmentMemberSpec("process3"); + final AssignmentMemberSpec memberSpec32 = createAssignmentMemberSpec("process3"); + + Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member2", memberSpec2), mkEntry("member3_1", memberSpec31), mkEntry("member3_2", memberSpec32)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(3, false) + ); + + assertEquals(1, getAllActiveTaskCount(result, "member2")); + assertEquals(2, getAllActiveTaskCount(result, "member3_1", "member3_2")); + } + + @Test + public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() { + final Set<Integer> p1PrevTasks = mkSet(0, 2); + final Set<Integer> p2PrevTasks = mkSet(1, 3); + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", p1PrevTasks)), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", p2PrevTasks)), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + + + + assertEquals(1, getAllActiveTaskCount(result, "member3")); + final List<Integer> p3ActiveTasks = getAllActiveTaskIds(result, "member3"); + + if (p1PrevTasks.removeAll(p3ActiveTasks)) { + assertEquals(p2PrevTasks, new HashSet<>(getAllActiveTaskIds(result, "member2"))); + } else { + assertEquals(p1PrevTasks, new HashSet<>(getAllActiveTaskIds(result, "member1"))); + } + } + + @Test + public void shouldNotMoveAnyTasksWhenNewTasksAdded() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(2, 3))), Collections.emptyMap()); + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(6, false) + ); + + final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertTrue(mem1Tasks.contains(0)); + assertTrue(mem1Tasks.contains(1)); + + final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertTrue(mem2Tasks.contains(2)); + assertTrue(mem2Tasks.contains(3)); + } + + @Test + public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() { + + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(2, 1))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(0, 3))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(6, false) + ); + + final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertTrue(mem1Tasks.contains(2)); + assertTrue(mem1Tasks.contains(1)); + + final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertTrue(mem2Tasks.contains(0)); + assertTrue(mem2Tasks.contains(3)); + + final List<Integer> mem3Tasks = getAllActiveTaskIds(result, "member3"); + assertTrue(mem3Tasks.contains(4)); + assertTrue(mem3Tasks.contains(5)); + } + + @Test + public void shouldAssignTasksNotPreviouslyActiveToNewClient() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), + mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), + mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", + mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))), + mkMap(mkEntry("test-subtopology0", mkSet(2)), mkEntry("test-subtopology1", mkSet(2)))); + final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("process4", + Collections.emptyMap(), + mkMap(mkEntry("test-subtopology0", mkSet(0, 1, 2, 3)), mkEntry("test-subtopology1", mkSet(0, 1, 2, 3)), mkEntry("test-subtopology2", mkSet(0, 1, 2, 3)))); + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("newMember", newMemberSpec)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology0", "test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + + assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), + getAllActiveTasks(result, "member1")); + assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), + getAllActiveTasks(result, "member2")); + assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))), + getAllActiveTasks(result, "member3")); + assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), mkEntry("test-subtopology1", mkSet(0))), + getAllActiveTasks(result, "newMember")); + } + + @Test + public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), + mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), + mkMap(mkEntry("test-subtopology0", mkSet(1, 2, 3)), mkEntry("test-subtopology1", mkSet(0, 2, 3)), mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + + + final AssignmentMemberSpec bounce1 = createAssignmentMemberSpec("bounce1", + Collections.emptyMap(), + mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3)))); + + + final AssignmentMemberSpec bounce2 = createAssignmentMemberSpec("bounce2", + Collections.emptyMap(), + mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), mkEntry("test-subtopology1", mkSet(0)))); + + + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("bounce_member1", bounce1), mkEntry("bounce_member2", bounce2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Arrays.asList("test-subtopology0", "test-subtopology1", "test-subtopology2"), new HashMap<>()), + new TopologyDescriberImpl(4, false) + ); + + + assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(1)), mkEntry("test-subtopology1", mkSet(2, 3))), + getAllActiveTasks(result, "member1")); + assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(0)), mkEntry("test-subtopology1", mkSet(1)), mkEntry("test-subtopology2", mkSet(2))), + getAllActiveTasks(result, "member2")); + assertEquals(mkMap(mkEntry("test-subtopology2", mkSet(0, 1, 3))), + getAllActiveTasks(result, "bounce_member1")); + assertEquals(mkMap(mkEntry("test-subtopology0", mkSet(2, 3)), mkEntry("test-subtopology1", mkSet(0))), + getAllActiveTasks(result, "bounce_member2")); + } + + @Test + public void shouldAssignTasksToNewClient() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(2, false) + ); + + assertEquals(1, getAllActiveTaskCount(result, "member1")); + } + + @Test + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", mkSet(3, 4, 5))), Collections.emptyMap()); + final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("process3"); + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("newMember", newMemberSpec)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(6, false) + ); + + final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertFalse(mem1Tasks.contains(3)); + assertFalse(mem1Tasks.contains(4)); + assertFalse(mem1Tasks.contains(5)); + assertEquals(2, mem1Tasks.size()); + + final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertFalse(mem2Tasks.contains(0)); + assertFalse(mem2Tasks.contains(1)); + assertFalse(mem2Tasks.contains(2)); + assertEquals(2, mem2Tasks.size()); + + assertEquals(2, getAllActiveTaskIds(result, "newMember").size()); + } + + @Test + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() { + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", mkSet(0, 1, 2, 6))), Collections.emptyMap()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", Collections.emptyMap(), mkMap(mkEntry("test-subtopology", mkSet(3, 4, 5)))); + final AssignmentMemberSpec newMemberSpec = createAssignmentMemberSpec("newProcess"); + + final Map<String, AssignmentMemberSpec> members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("newMember", newMemberSpec)); + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, Collections.singletonList("test-subtopology"), new HashMap<>()), + new TopologyDescriberImpl(7, false) + ); + + final List<Integer> mem1Tasks = getAllActiveTaskIds(result, "member1"); + assertFalse(mem1Tasks.contains(3)); + assertFalse(mem1Tasks.contains(4)); + assertFalse(mem1Tasks.contains(5)); + assertEquals(3, mem1Tasks.size()); + + final List<Integer> mem2Tasks = getAllActiveTaskIds(result, "member2"); + assertFalse(mem2Tasks.contains(0)); + assertFalse(mem2Tasks.contains(1)); + assertFalse(mem2Tasks.contains(2)); + assertEquals(2, mem2Tasks.size()); + + assertEquals(2, getAllActiveTaskIds(result, "newMember").size()); + } + + + private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) { + int size = 0; + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().size() != 0) { + for (Map.Entry<String, Set<Integer>> entry : testMember.activeTasks().entrySet()) { + size += entry.getValue().size(); + } + } + } + return size; + } + + private Set<Integer> getActiveTasks(GroupAssignment result, final String topologyId, String... memberIds) { + Set<Integer> res = new HashSet<>(); + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().get(topologyId) != null) { + res.addAll(testMember.activeTasks().get(topologyId)); + } + } + return res; + } + + private List<Integer> getAllActiveTaskIds(GroupAssignment result, String... memberIds) { + List<Integer> res = new ArrayList<>(); + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().size() != 0) { + for (Map.Entry<String, Set<Integer>> entry : testMember.activeTasks().entrySet()) { + res.addAll(entry.getValue()); + } + } + } + return res; + } + + private List<Integer> getAllActiveTaskIds(GroupAssignment result) { + String[] memberIds = new String[result.members().size()]; + return getAllActiveTaskIds(result, result.members().keySet().toArray(memberIds)); + } + + private Map<String, Set<Integer>> getAllActiveTasks(GroupAssignment result, String memberId) { + + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.activeTasks()); + if (testMember.activeTasks().size() != 0) { + return testMember.activeTasks(); + } + return new HashMap<>(); + } + + private Map<String, Set<Integer>> getAllStandbyTasks(GroupAssignment result, String memberId) { + + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.standbyTasks()); + if (testMember.standbyTasks().size() != 0) { + return testMember.standbyTasks(); + } + return new HashMap<>(); + } + + private List<Integer> getAllStandbyTaskIds(GroupAssignment result, String... memberIds) { + List<Integer> res = new ArrayList<>(); + for (String memberId : memberIds) { + final MemberAssignment testMember = result.members().get(memberId); + assertNotNull(testMember); + assertNotNull(testMember.standbyTasks()); + if (testMember.standbyTasks().size() != 0) { + for (Map.Entry<String, Set<Integer>> entry : testMember.standbyTasks().entrySet()) { + res.addAll(entry.getValue()); + } + } + } + return res; + } + + private List<Integer> getAllStandbyTaskIds(GroupAssignment result) { + String[] memberIds = new String[result.members().size()]; + return getAllStandbyTaskIds(result, result.members().keySet().toArray(memberIds)); + } + + private Map<String, Set<Integer>> mergeAllActiveTasks(GroupAssignment result, String... memberIds) { + Map<String, Set<Integer>> res = new HashMap<>(); + for (String memberId : memberIds) { + Map<String, Set<Integer>> memberActiveTasks = getAllActiveTasks(result, memberId); + res = Stream.of(res, memberActiveTasks) + .flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (v1, v2) -> { + v1.addAll(v2); + return new HashSet<>(v1); + })); + + + } + return res; + } + + private List<Integer> getAllTaskIds(GroupAssignment result, String... memberIds) { + List<Integer> res = new ArrayList<>(); + res.addAll(getAllActiveTaskIds(result, memberIds)); + res.addAll(getAllStandbyTaskIds(result, memberIds)); + return res; + } + + private Map<String, Set<Integer>> mergeAllStandbyTasks(GroupAssignment result, String... memberIds) { + Map<String, Set<Integer>> res = new HashMap<>(); + for (String memberId : memberIds) { + Map<String, Set<Integer>> memberStandbyTasks = getAllStandbyTasks(result, memberId); + res = Stream.of(res, memberStandbyTasks) + .flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (v1, v2) -> { + v1.addAll(v2); + return new HashSet<>(v1); + })); + + + } + return res; + } + + private Map<String, Set<Integer>> mergeAllStandbyTasks(GroupAssignment result) { + String[] memberIds = new String[result.members().size()]; + return mergeAllStandbyTasks(result, result.members().keySet().toArray(memberIds)); + } + + private AssignmentMemberSpec createAssignmentMemberSpec(final String processId) { + return new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + processId, + Collections.emptyMap(), + Collections.emptyMap()); + } + + private AssignmentMemberSpec createAssignmentMemberSpec(final String processId, final Map<String, Set<Integer>> prevActiveTasks, + final Map<String, Set<Integer>> prevStandbyTasks) { + return new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + prevActiveTasks, + prevStandbyTasks, + Collections.emptyMap(), + processId, + Collections.emptyMap(), + Collections.emptyMap()); + } + + static class TopologyDescriberImpl implements TopologyDescriber { + final int numPartitions; + final boolean isStateful; + + TopologyDescriberImpl(int numPartitions, boolean isStateful) { + this.numPartitions = numPartitions; + this.isStateful = isStateful; + } + + @Override + public int numPartitions(String subtopologyId) { + return numPartitions; + } + + @Override + public boolean isStateful(String subtopologyId) { + return isStateful; + } + } + + static class TopologyDescriberImpl2 implements TopologyDescriber { + @Override + public int numPartitions(String subtopologyId) { + if (subtopologyId.equals("test-subtopology1")) + return 6; + return 1; + } + + @Override + public boolean isStateful(String subtopologyId) { + return false; + } + } +}
